使用Kafka Connect同步关系型数据库
启动Kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
启动MySQL和Elasticsearch
mysql.server start
elasticsearch &
Kafka Connect 部署
-
- 将下载的Kafka connect JDBC和Driver插件jar包放在
目录下{KFAKA_HOME}/plugins
- 将下载的Kafka connect JDBC和Driver插件jar包放在
修改配置文件,分布式
分布式
修改
{KFAKA_HOME}/config/connect-distributed.properties
配置文件,添加插件路径plugin.path={KFAKA_HOME}/plugins
本地模式
修改
{KFAKA_HOME}/config/connect-standalone.properties
配置文件,添加插件路径plugin.path={KFAKA_HOME}/plugins
connector配置
增量字段必须not null, 不同的同步列启动不同的connector source or sink
JdbcSourceConnector配置
HTTP请求
curl -XPOST "http://localhost:8083/connectors" -H 'Content-Type: application/json' -d '{ "name":"aflm-source", "config":{ "connection.url":"jdbc:oracle:thin:@//10.25.84.8:1521/aflm", "connection.user":"aflmdata", "connection.password":"aflmdata123", "dialect.name":"OracleDatabaseDialect", "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector", "table.whitelist":"SAPLOANNODE", "mode":"timestamp", "timestamp.column.name":"SLN_COMPLETIONTIME", "topic.prefix":"aflm.", "db.timezone":"Asia/Shanghai" } }'
配置文件 aflm-source.properties
name=aflm-source connection.url=jdbc:oracle:thin:@//10.25.84.8:1521/aflm connection.use=aflmdata connection.password=aflmdata123 dialect.name=OracleDatabaseDialect connector.class=io.confluent.connect.jdbc.JdbcSourceConnector table.whitelist=SAPLOANNODE mode=timestamp timestamp.column.name=SLN_COMPLETIONTIME topic.prefix=aflm. db.timezone=Asia/Shanghai
JdbcSinkConnector配置
HTTP请求
curl -XPOST "http://localhost:8083/connectors" -H 'Content-Type: application/json' -d '{ "name":"aflm-sink", "config":{ "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "topics.regex":"aflm.*", "connection.url":"jdbc:postgresql://127.0.0.1:5432/aflm", "connection.user":"postgres", "connection.password":"postgres", "insert.mode":"upsert", "batch.size":"3000", "pk.mode":"record_value", "pk.fields":"SLN_PK", "auto.create":"true", "auto.evolve":"true", "db.timezone":"Asia/Shanghai" } }'
配置文件 aflm-sink.properties
name=aflm-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector topics.regex=aflm.* connection.url=jdbc:postgresql://127.0.0.1:5432/aflm connection.user=postgres connection.password=postgres insert.mode=upsert batch.size=3000 pk.mode=record_value pk.fields=SLN_PK auto.create=true auto.evolve=true db.timezone=Asia/Shanghai
ElasticsearchSinkConnector配置
HTTP请求
curl -XPOST "http://localhost:8083/connectors" -H 'Content-Type: application/json' -d '{ "name":"elastic-login-connector", "config":{ "connector.class":"ElasticsearchSinkConnector", "connection.url":"http://localhost:9200", "type.name":"mysql-data", "topics":"mysql.login", "key.ignore":true } }'
启动connector
启动分布式节点,connector需要使用REST接口POST配置信息
bin/connect-distributed.sh config/connect-distributed.properties
启动单机节点,connector可以通过配置文件启动
bin/connect-standalone.sh config/connect-standalone.properties config/kafka-connect-jdbc/aflm-source.properties config/kafka-connect-jdbc/aflm-sink.properties
查看connector
curl -XGET "http://localhost:8083/connectors"
删除connector
curl -XDELETE "http://localhost:8083/connectors/connector_name"