Kafka Connect和Oracle Number数据类型转换
默认情况下,Kafka Connect JDBC Connector无法很好地应对:
- NUMBER没有定义精度/比例的列。您最终可能会bytes在输出中出现明显的垃圾(),或者只是错误。
- TIMESTAMP WITH LOCAL TIME ZONE。JDBC type -102 not currently supported在日志中引发警告。
如何解决
Confluent Platform 4.1.1中的新功能: numeric.mapping
在连接器配置中,设置 "numeric.mapping":"best_fit"
Confluent Platform 4.1.1(Doc)中的新功能
避免这个问题
更改源对象的DDL。例如:
1. 提高NUMBER精度和规模
2. 使用TIMESTAMP受支持的类型
query中CAST数据类型
直接从对象拉出,并query在JDBC连接器中使用(而不是table.whitelist) - 并适当地转换列
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "jdbc_source_oracle_soe_logon_07",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:soe/soe@localhost:1521/ORCLPDB1",
"mode": "incrementing",
"query": "SELECT CAST(LOGON_ID AS NUMERIC(8,0)) AS LOGON_ID, CAST(CUSTOMER_ID AS NUMERIC(18,0)) AS CUSTOMER_ID, LOGON_DATE FROM LOGON",
"poll.interval.ms": "1000",
"incrementing.column.name":"LOGON_ID",
"topic.prefix": "ora-soe-07-LOGON",
"validate.non.null":false
}
}'
使用源数据库中的View来转换数据类型
在源DB中定义一个适当地转换列的视图,然后使用连接器代替(确保包含"table.types":"VIEW")
CREATE VIEW VW_LOGON AS SELECT CAST(LOGON_ID AS NUMERIC(8,0)) AS LOGON_ID, CAST(CUSTOMER_ID AS NUMERIC(18,0)) AS CUSTOMER_ID, LOGON_DATE FROM LOGON;
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "jdbc_source_oracle_soe_logon_05",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:soe/soe@localhost:1521/ORCLPDB1",
"table.whitelist":"VW_LOGON",
"table.types":"VIEW",
"mode": "incrementing",
"poll.interval.ms": "1000",
"incrementing.column.name":"LOGON_ID",
"topic.prefix": "ora-soe-05-",
"validate.non.null":false
}
}'