跳到主要内容
版本: 0.6

Redis

1. 场景

1.2 通过flinlSql, 将kafka的数据关联 redis;

2. 组件准备

组件版本
flink1.14.4
Flink-mysql-cdc2.2.1
redis6.2.4
Mysql5.7+
Dinky0.6.6
commons-pool22.11.0
jedis3.3.0
flink-connector-redis1.0.11
flink-sql-connector-kafka2.12-1.14.4

温馨提示

commons-pool2, jedis 包, 是flink-connector-redis 的jar引用到了, 故添加上, 这两个maven仓库都能下载.或者您自己编译源码. 其中jedis的编译的版本比较旧了, 有更新的RC包, 安全起见, 如部署生产环境, 应自行编译flink-connector-redis, github地址: https://github.com/jeff-zou/flink-connector-redis 在此感谢jeff-zou大佬贡献的connector!

3. 部署

Dlink 的 plugins 下添加 commons-pool2-2.11.0.jar, jedis-3.3.0.jar, flink-connector-redis-1.0.11.jar, flink-sql-connector-kafka_2.12-1.14.4.jar, 重启 Dlink. flink standalone模式 或者 yarn 模式, 请自行往需要的地方添加依赖.

依赖图

场景1:

1.1 准备需要同步的数据, 同步库 emp_1 下的employees_1,employees_2

create database emp_1;

CREATE TABLE IF NOT EXISTS `employees_1` (
`emp_no` int(11) NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(50) NOT NULL,
`last_name` varchar(50) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert into employees_1 VALUES ("10", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
insert into employees_1 VALUES ("11", "1990-09-15", "wang", "meimei", "F", "2021-04-12");

CREATE TABLE IF NOT EXISTS `employees_2` (
`emp_no` int(11) NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(50) NOT NULL,
`last_name` varchar(50) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert into employees_2 VALUES ("20", "1987-01-23", "huang", "menji", "M", "2000-03-04");
insert into employees_2 VALUES ("21", "1993-04-21", "lu", "benweiniubi", "M", "2022-05-06");
SET execution.checkpointing.interval = 10s;
SET execution.checkpointing.tolerable-failed-checkpoints = 3;
SET execution.checkpointing.timeout = 300s;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET pipeline.operator-chaining = false;

CREATE TABLE employees_source (
table_name STRING METADATA VIRTUAL,
emp_no int NOT NULL,
birth_date date,
first_name STRING,
last_name STRING,
gender STRING,
hire_date date,
PRIMARY KEY (`emp_no`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.211.55.24',
'port' = '3306',
'username' = 'root',
'password' = '000000',
'database-name' = 'emp_1',
'table-name' = 'employees_[0-9]+'
);

create table sink_redis(
hsetKey VARCHAR,
hsetField VARCHAR,
hsetValue VARCHAR
)
with ('connector'='redis',
'redis-mode'='single',
'host'='hadoop102',
'port' = '6379',
'maxTotal' = '100',
'maxIdle' = '5',
'minIdle' = '5',
'sink.parallelism' = '1',
'sink.max-retries' = '3',
'command'='hset'
);

create view temp_view (hsetKey, hsetField, hsetValue) AS
select
table_name as hsetKey,
CAST(emp_no as STRING) as hsetField,
'{"' || 'birth_date' || '":"' || CAST(birth_date as STRING) || '",' ||
'"' || 'first_name' || '":"' || first_name || '",' ||
'"' || 'last_name' || '":"' || last_name || '",' ||
'"' || 'gender' || '":"' || gender || '",' ||
'"' || 'hire_date' || '":"' || CAST(hire_date as STRING) || '"}' as hsetValue
from employees_source;

insert into sink_redis select hsetKey, hsetField, hsetValue from temp_view;

sql说明:

'{"' || 'birth_date' || '":"' || CAST(birth_date as STRING) || '",' ||
'"' || 'first_name' || '":"' || first_name || '",' ||
'"' || 'last_name' || '":"' || last_name || '",' ||
'"' || 'gender' || '":"' || gender || '",' ||
'"' || 'hire_date' || '":"' || CAST(hire_date as STRING) || '"}' as hsetValue

上面只是把数据库的字段和值, 拼接成json串, 作为redis hset 的value而已, 作为一个案例展示, 没有别的含义. 因flink 1.15版本以上才内置json生成函数, 所以这里用 || 拼接 redis connector 其实支持很多操作, 支持设置更多配置, 更详细的用法还请自行翻阅github.

1.3 运行perjob模式

FlinkWebUI

上图可见,流任务已经成功被 Dinky 提交的远程集群了

1.4 数据效果图

1.5 修改数据

将 last_name 对应的值 kunkun 改为 momoda

flink webui 数据流变化

查看redis数据, 数据已经被修改

1.6 新增数据

表增加一条数据

insert into employees_1 VALUES ("12", "1996-06-16", "dili", "reba", "F", "2000-07-25");

redis 成功增加一条

场景2

通过flinksql 将kafka 与 redis数据关联

kafka 待发送数据:

{"company_id":"employees_1", "emp_id":"10", "log_id":"log_a_001"}
SET execution.checkpointing.interval = 10s;
SET execution.checkpointing.tolerable-failed-checkpoints = 3;
SET execution.checkpointing.timeout = 300s;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET pipeline.operator-chaining = false;

create table kafka_source(
company_id string,
emp_id string,
log_id string,
event_time as procTime()
) with (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = '10.211.55.24:9092',
'properties.group.id' = 'test',
'properties.partition-discovery.interval-millis' = '30000',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);

create table dim_redis(
hsetKey VARCHAR,
hsetField VARCHAR,
hsetValue VARCHAR
)
with ('connector'='redis',
'redis-mode'='single',
'host'='hadoop102',
'port' = '6379',
'lookup.cache.max-rows' = '5000',
'lookup.cache.ttl' = '300',
'maxTotal' = '100',
'maxIdle' = '5',
'minIdle' = '5',
'sink.parallelism' = '1',
'sink.max-retries' = '3',
'command'='hget'
);

create table sink_table(
company_id varchar,
emp_id varchar,
log_id varchar,
event_time timestamp,
first_name varchar,
last_name varchar,
hire_date varchar
) with ('connector' = 'print');

create view temp_view as
select
company_id as company_id,
emp_id as emp_id,
log_id as log_id,
event_time as event_time,
JSON_VALUE(d.hsetValue, '$.first_name') as first_name,
JSON_VALUE(d.hsetValue, '$.last_name') as last_name,
JSON_VALUE(d.hsetValue, '$.hire_date') as hire_date
from
kafka_source as k
left join
dim_redis for system_time as of k.event_time as d
on
k.company_id = d.hsetKey
and
k.emp_id = d.hsetField;

insert into sink_table select* from temp_view;

2.2 任务栏

2.4 成功关联到数据并输出控制台

说明: 参数 pipeline.operator-chaining 是为了临时测试,观看数据流图 业务上不推荐设置为false