Canal
Canal 主要用途是基于MySQL 数据库增量日志解析,提供增量数据订阅和消费
部署canal 单机版
canal 的工作原理
MySQL 安装
mysql 主要用来存储canal 的一些信息
mysql 创建pvc 存储
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: mysql-pv
namespace: iot
spec:
capacity:
storage: 50Gi
volumeMode: Filesystem
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Recycle
storageClassName: slow
nfs:
path: /data/mysql-data
server: ###IP 地址
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: mysql-pvc
namespace: iot
spec:
accessModes:
- ReadWriteMany
volumeMode: Filesystem
resources:
requests:
storage: 50Gi
storageClassName: slow
apiVersion: v1
kind: Service
metadata:
name: mysql
namespace: iot
spec:
type: NodePort
ports:
- port: 3306
selector:
app: mysql
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: mysql
namespace: iot
spec:
selector:
matchLabels:
app: mysql
strategy:
type: Recreate
template:
metadata:
labels:
app: mysql
spec:
containers:
- image: mysql:8.0.19
name: mysql
env:
# 在实际中使用 secret
- name: MYSQL_ROOT_PASSWORD
value: bdtp@2022
ports:
- containerPort: 3306
name: mysql
volumeMounts:
- name: mysql-data
mountPath: /var/lib/mysql
volumes:
- name: mysql-data
persistentVolumeClaim:
claimName: mysql-pvc
创建canal 数据库
create database canal_manager default character set utf8mb4 collate utf8mb4_unicode_ci;
CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
grant all on canal_manager.* to 'canal'@'%' ;
flush privileges;
#其中8.0远程连接要修改为mysql_native_password 认证连接,否则连接失败,如果不是mysql_native_password 则使用以下修改语句
## ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
导入sql
https://github.com/alibaba/canal/blob/canal-1.1.6/admin/admin-web/src/main/resources/canal_manager.sql
canal-admin 创建
apiVersion: v1
kind: ConfigMap
metadata:
name: canal-admin
namespace: iot
data:
admin_user: "admin"
admin_password: "123456"
datasource_address: "mysql:3306"
datasource_database: "canal_manager"
datasource_username: "canal"
datasource_password: "canal"
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: canal-admin
namespace: iot
labels:
app.kubernetes.io/name: canal-admin
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: canal-admin
template:
metadata:
name: canal-admin
creationTimestamp: null
labels:
app.kubernetes.io/name: canal-admin
spec:
containers:
- name: canal-admin
image: 'canal/canal-admin:v1.1.6'
imagePullPolicy: IfNotPresent
ports:
- name: web
containerPort: 8089
protocol: TCP
env:
- name: server.port
value: '8089'
- name: canal.adminUser
valueFrom:
configMapKeyRef:
name: canal-admin
key: admin_user
- name: canal.adminPasswd
valueFrom:
configMapKeyRef:
name: canal-admin
key: admin_password
- name: spring.datasource.address
valueFrom:
configMapKeyRef:
name: canal-admin
key: datasource_address
- name: spring.datasource.database
valueFrom:
configMapKeyRef:
name: canal-admin
key: datasource_database
- name: spring.datasource.username
valueFrom:
configMapKeyRef:
name: canal-admin
key: datasource_username
- name: spring.datasource.password
valueFrom:
configMapKeyRef:
name: canal-admin
key: datasource_password
resources:
requests:
cpu: 250m
memory: 256Mi
livenessProbe:
httpGet:
path: /
port: 8089
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 30
readinessProbe:
httpGet:
path: /
port: 8089
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 30
restartPolicy: Always
nodeSelector: {}
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/name: canal-admin
namespaces:
- iot
topologyKey: kubernetes.io/hostname
revisionHistoryLimit: 10
---
kind: Service
apiVersion: v1
metadata:
name: canal-admin
namespace: iot
spec:
type: NodePort
ports:
- protocol: TCP
port: 8089
targetPort: 8089
selector:
app.kubernetes.io/name: canal-admin
canal-server 部署
apiVersion: v1
kind: ConfigMap
metadata:
name: canal-server
namespace: iot
data:
admin_manager: "canal-admin:8089"
admin_port: "11110"
admin_user: "admin"
admin_password: "6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9" # 123456加密
#admin_register_cluster: "local" ##集群名称
admin_register_auto: "true"
admin_register_name: "iot-canal-server" ##canal-server 自动注册名称
---
kind: StatefulSet ##canal-server 采用StatefulSet 方式部署
apiVersion: apps/v1
metadata:
name: canal-server
namespace: iot
labels:
app.kubernetes.io/name: canal-server
app: canal-server
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: canal-server
app: canal-server
template:
metadata:
name: canal-server
labels:
app.kubernetes.io/name: canal-server
app: canal-server
spec:
imagePullSecrets:
- name: secret
containers:
- name: canal-server
image: 'canal/canal-server:v1.1.6'
imagePullPolicy: Always
ports:
- name: tcp
containerPort: 11111
protocol: TCP
env:
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: SERVICE_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: 'metadata.labels[''app'']'
- name: STS_NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: canal.register.ip
value: $(POD_NAME).$(SERVICE_NAME).$(STS_NAMESPACE) ## canal-server采用实例名,防止每次重启导致的IP不同,Instance实例选择时的变化
- name: canal.admin.manager
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_manager
- name: canal.admin.port
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_port
- name: canal.admin.user
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_user
- name: canal.admin.passwd
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_password
#- name: canal.admin.register.cluster
# valueFrom:
# configMapKeyRef:
# name: canal-server
# key: admin_register_cluster
- name: canal.admin.register.auto
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_register_auto
- name: canal.admin.register.name
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_register_name
resources:
requests:
cpu: 250m
memory: 256Mi
livenessProbe:
tcpSocket:
port: 11112
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 30
readinessProbe:
tcpSocket:
port: 11112
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 30
restartPolicy: Always
nodeSelector: {}
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/name: canal-server
namespaces:
- iot
topologyKey: kubernetes.io/hostname
serviceName: canal-server
revisionHistoryLimit: 10
---
kind: Service
apiVersion: v1
metadata:
name: canal-server
namespace: iot
spec:
ports:
- protocol: TCP
port: 11110
targetPort: 11110
type: ClusterIP
selector:
app.kubernetes.io/name: canal-server
app: canal-server
canlal-admin 配置修改
kafka 同步
canal-admin 运行正常之后,就可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456
可以看到默认已经注册了
点击操作—修改主配置-点击保存即可
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
# 默认是tcp模式,修改为kafka
######### Kafka #############
##################################################
kafka.bootstrap.servers = 172.19.0.64:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='用户名' password='密码';
kafka.sasl.mechanism=PLAIN
kafka.security.protocol=PLAINTEXT
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
canal 创建instance
配置kafka 同步
## 需要修改的部分,源库的ip数据库地址
canal.instance.master.address=127.0.0.1:3306
# 源库的用户名与密码
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 匹配同步的数据库规则
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# mq config
# kafka topic 的名字,可以在kafka创建,与kafka topic对应,如果不填写,会自动创建
canal.mq.topic=example
#备注: 需要在源库中创建用户名与密码并且同时具有同步的权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
kafka 查看消息
也可以查看instance 的日志
rabbitmq 同步
rabbitmq 的k8s 部署
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: rabbitmq
spec:
replicas: 1
selector:
matchLabels:
app: rabbitmq
template:
metadata:
labels:
app: rabbitmq
spec:
containers:
- name: rabbitmq
image: rabbitmq:3.7.13-management
imagePullPolicy: IfNotPresent
ports:
- containerPort: 5672
name: server-port
- containerPort: 15672
name: http-port
env:
- name: RABBITMQ_DEFAULT_VHOST
value: /iot
- name: RABBITMQ_DEFAULT_USER
value: admin
- name: RABBITMQ_DEFAULT_PASS
value: bdtp@2023
volumeMounts:
- name: persisitent-storage-rabbitmq
mountPath: /var/lib/rabbitmq
- name: localtime
readOnly: true
mountPath: /etc/localtime
volumes:
- name: persisitent-storage-rabbitmq
hostPath:
path: /data/rabbitmq
- name: localtime
hostPath:
type: File
path: /etc/localtime
---
apiVersion: v1
kind: Service
metadata:
name: rabbitmq-service
labels:
app: rabbitmq
spec:
selector:
app: rabbitmq
type: NodePort
ports:
- name: server-port
port: 5672
targetPort: 5672
- name: http-port
port: 15672
targetPort: 15672
nodePort: 30351
在rabbitmq 上创建exchange
-
创建queues
-
queues 绑定exchanges
自定义Routig key 名字
修改主配置文件canal.properties
canal.serverMode = rabbitmq
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host = rabbitmq-service.iot
rabbitmq.virtual.host = /iot
rabbitmq.exchange = canal-exchanges
rabbitmq.routingKey = canal-routing
rabbitmq.username = admin
rabbitmq.password = bdtp@2023
rabbitmq.deliveryMode = direct
rabbitmq.queue = canal-queues
修改配置文件instance.propertios
canal.instance.master.address=192.168.101.32:32060
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
# table regex
canal.instance.filter.regex=test01.test_ddl #同步的表
# table black regex
canal.instance.filter.black.regex=
# mq config
canal.mq.topic=canal-routing # 这里填写的是rabbitmq.routingKey
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
验证rabbitmq 查看消息