使用Kafka Connect同步关系型数据库

启动Kafka

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

启动MySQL和Elasticsearch

mysql.server start
elasticsearch &

Kafka Connect 部署

  1. 下载kafka connect JDBC

    • 将下载的Kafka connect JDBC和Driver插件jar包放在
      {KFAKA_HOME}/plugins
      
      目录下
  2. 修改配置文件,分布式

    分布式

    • 修改{KFAKA_HOME}/config/connect-distributed.properties配置文件,添加插件路径plugin.path={KFAKA_HOME}/plugins

      本地模式

    • 修改{KFAKA_HOME}/config/connect-standalone.properties配置文件,添加插件路径plugin.path={KFAKA_HOME}/plugins

  3. connector配置

    增量字段必须not null, 不同的同步列启动不同的connector source or sink

    JdbcSourceConnector配置

    • HTTP请求

        curl -XPOST "http://localhost:8083/connectors" -H 'Content-Type: application/json' -d '{
            "name":"aflm-source",
            "config":{
                "connection.url":"jdbc:oracle:thin:@//10.25.84.8:1521/aflm",
                "connection.user":"aflmdata",
                "connection.password":"aflmdata123",
                "dialect.name":"OracleDatabaseDialect",
                "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
                "table.whitelist":"SAPLOANNODE",
                "mode":"timestamp",
                "timestamp.column.name":"SLN_COMPLETIONTIME",
                "topic.prefix":"aflm.",
                "db.timezone":"Asia/Shanghai" 
            }
        }'
      
    • 配置文件 aflm-source.properties

        name=aflm-source
        connection.url=jdbc:oracle:thin:@//10.25.84.8:1521/aflm
        connection.use=aflmdata
        connection.password=aflmdata123
        dialect.name=OracleDatabaseDialect
        connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
        table.whitelist=SAPLOANNODE
        mode=timestamp
        timestamp.column.name=SLN_COMPLETIONTIME
        topic.prefix=aflm.
        db.timezone=Asia/Shanghai
      

      JdbcSinkConnector配置

    • HTTP请求

        curl -XPOST "http://localhost:8083/connectors" -H 'Content-Type: application/json' -d '{
            "name":"aflm-sink",
            "config":{
                "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
                "topics.regex":"aflm.*",
                "connection.url":"jdbc:postgresql://127.0.0.1:5432/aflm",
                "connection.user":"postgres",
                "connection.password":"postgres",
                "insert.mode":"upsert",
                "batch.size":"3000",
                "pk.mode":"record_value",
                "pk.fields":"SLN_PK",
                "auto.create":"true",
                "auto.evolve":"true",
                "db.timezone":"Asia/Shanghai"
            }
        }'
      
    • 配置文件 aflm-sink.properties

        name=aflm-sink
        connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
        topics.regex=aflm.*
        connection.url=jdbc:postgresql://127.0.0.1:5432/aflm
        connection.user=postgres
        connection.password=postgres
        insert.mode=upsert
        batch.size=3000
        pk.mode=record_value
        pk.fields=SLN_PK
        auto.create=true
        auto.evolve=true
        db.timezone=Asia/Shanghai
      

      ElasticsearchSinkConnector配置

    • HTTP请求

        curl -XPOST "http://localhost:8083/connectors" -H 'Content-Type: application/json' -d '{
            "name":"elastic-login-connector",
            "config":{
                "connector.class":"ElasticsearchSinkConnector",
                "connection.url":"http://localhost:9200",
                "type.name":"mysql-data",
                "topics":"mysql.login",
                "key.ignore":true
            }
        }'
      

启动connector

启动分布式节点,connector需要使用REST接口POST配置信息

bin/connect-distributed.sh config/connect-distributed.properties

启动单机节点,connector可以通过配置文件启动

bin/connect-standalone.sh config/connect-standalone.properties config/kafka-connect-jdbc/aflm-source.properties config/kafka-connect-jdbc/aflm-sink.properties

查看connector

curl -XGET "http://localhost:8083/connectors"

删除connector

curl -XDELETE "http://localhost:8083/connectors/connector_name"

results matching ""

    No results matching ""