Canal

Canal 主要用途是基于MySQL 数据库增量日志解析,提供增量数据订阅和消费

部署canal 单机版

canal 的工作原理

img
MySQL 安装

mysql 主要用来存储canal 的一些信息

mysql 创建pvc 存储

---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: mysql-pv
  namespace: iot
spec:
  capacity:
    storage: 50Gi
  volumeMode: Filesystem
  accessModes:
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Recycle
  storageClassName: slow
  nfs:
    path: /data/mysql-data
    server: ###IP 地址

---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: mysql-pvc
  namespace: iot
spec:
  accessModes:
    - ReadWriteMany
  volumeMode: Filesystem
  resources:
    requests:
      storage: 50Gi
  storageClassName: slow
apiVersion: v1
kind: Service
metadata:
  name: mysql
  namespace: iot
spec:
  type: NodePort
  ports:
  - port: 3306
  selector:
    app: mysql
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mysql
  namespace: iot
spec:
  selector:
    matchLabels:
      app: mysql
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: mysql
    spec:
      containers:
      - image: mysql:8.0.19
        name: mysql
        env:
          # 在实际中使用 secret
        - name: MYSQL_ROOT_PASSWORD
          value: bdtp@2022
        ports:
        - containerPort: 3306
          name: mysql
        volumeMounts:
        - name: mysql-data
          mountPath: /var/lib/mysql
      volumes:
      - name: mysql-data
        persistentVolumeClaim:
          claimName: mysql-pvc
创建canal 数据库
create database canal_manager default character set utf8mb4 collate utf8mb4_unicode_ci;
CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
grant all on canal_manager.* to 'canal'@'%' ;
flush privileges;
#其中8.0远程连接要修改为mysql_native_password 认证连接,否则连接失败,如果不是mysql_native_password 则使用以下修改语句
## ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
导入sql 
https://github.com/alibaba/canal/blob/canal-1.1.6/admin/admin-web/src/main/resources/canal_manager.sql
canal-admin 创建
apiVersion: v1
kind: ConfigMap
metadata:
  name: canal-admin
  namespace: iot
data:
  admin_user: "admin"
  admin_password: "123456"
  datasource_address: "mysql:3306"
  datasource_database: "canal_manager"
  datasource_username: "canal"
  datasource_password: "canal"

---

kind: Deployment
apiVersion: apps/v1
metadata:
  name: canal-admin
  namespace: iot
  labels:
    app.kubernetes.io/name: canal-admin
spec:
  replicas: 1
  selector:
    matchLabels:
      app.kubernetes.io/name: canal-admin
  template:
    metadata:
      name: canal-admin
      creationTimestamp: null
      labels:
        app.kubernetes.io/name: canal-admin
    spec:
      containers:
        - name: canal-admin
          image: 'canal/canal-admin:v1.1.6'
          imagePullPolicy: IfNotPresent
          ports:
            - name: web
              containerPort: 8089
              protocol: TCP
          env:
            - name: server.port
              value: '8089'
            - name: canal.adminUser
              valueFrom:
                configMapKeyRef:
                  name: canal-admin
                  key: admin_user
            - name: canal.adminPasswd
              valueFrom:
                configMapKeyRef:
                  name: canal-admin
                  key: admin_password
            - name: spring.datasource.address
              valueFrom:
                configMapKeyRef:
                  name: canal-admin
                  key: datasource_address
            - name: spring.datasource.database
              valueFrom:
                configMapKeyRef:
                  name: canal-admin
                  key: datasource_database
            - name: spring.datasource.username
              valueFrom:
                configMapKeyRef:
                  name: canal-admin
                  key: datasource_username
            - name: spring.datasource.password
              valueFrom:
                configMapKeyRef:
                  name: canal-admin
                  key: datasource_password
          resources:
            requests:
              cpu: 250m
              memory: 256Mi
          livenessProbe:
            httpGet:
              path: /
              port: 8089
            initialDelaySeconds: 10
            timeoutSeconds: 5
            periodSeconds: 30
          readinessProbe:
            httpGet:
              path: /
              port: 8089
            initialDelaySeconds: 10
            timeoutSeconds: 5
            periodSeconds: 30
      restartPolicy: Always
      nodeSelector: {}
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
            - weight: 1
              podAffinityTerm:
                labelSelector:
                  matchLabels:
                    app.kubernetes.io/name: canal-admin
                namespaces:
                  - iot
                topologyKey: kubernetes.io/hostname
  revisionHistoryLimit: 10

---

kind: Service
apiVersion: v1
metadata:
  name: canal-admin
  namespace: iot
spec:
  type: NodePort
  ports:
    - protocol: TCP
      port: 8089
      targetPort: 8089
  selector:
    app.kubernetes.io/name: canal-admin

canal-server 部署

apiVersion: v1
kind: ConfigMap
metadata:
  name: canal-server
  namespace: iot
data:
  admin_manager: "canal-admin:8089"
  admin_port: "11110"
  admin_user: "admin"
  admin_password: "6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9"   # 123456加密
  #admin_register_cluster: "local"  ##集群名称
  admin_register_auto: "true"
  admin_register_name: "iot-canal-server"  ##canal-server 自动注册名称

---
kind: StatefulSet    ##canal-server 采用StatefulSet 方式部署
apiVersion: apps/v1
metadata:
  name: canal-server
  namespace: iot
  labels:
    app.kubernetes.io/name: canal-server
    app: canal-server
spec:
  replicas: 1
  selector:
    matchLabels:
      app.kubernetes.io/name: canal-server
      app: canal-server
  template:
    metadata:
      name: canal-server
      labels:
        app.kubernetes.io/name: canal-server
        app: canal-server
    spec:
      imagePullSecrets:
      - name: secret
      containers:
        - name: canal-server
          image: 'canal/canal-server:v1.1.6'
          imagePullPolicy: Always
          ports:
            - name: tcp
              containerPort: 11111
              protocol: TCP
          env:
            - name: POD_NAME
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: metadata.name
            - name: SERVICE_NAME
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: 'metadata.labels[''app'']'
            - name: STS_NAMESPACE
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: metadata.namespace
            - name: canal.register.ip
              value: $(POD_NAME).$(SERVICE_NAME).$(STS_NAMESPACE)  ##  canal-server采用实例名,防止每次重启导致的IP不同,Instance实例选择时的变化
            - name: canal.admin.manager
              valueFrom:
                configMapKeyRef:
                  name: canal-server
                  key: admin_manager
            - name: canal.admin.port
              valueFrom:
                configMapKeyRef:
                  name: canal-server
                  key: admin_port
            - name: canal.admin.user
              valueFrom:
                configMapKeyRef:
                  name: canal-server
                  key: admin_user
            - name: canal.admin.passwd
              valueFrom:
                configMapKeyRef:
                  name: canal-server
                  key: admin_password
            #- name: canal.admin.register.cluster
            #  valueFrom:
            #    configMapKeyRef:
            #      name: canal-server
            #      key: admin_register_cluster
            - name: canal.admin.register.auto
              valueFrom:
                configMapKeyRef:
                  name: canal-server
                  key: admin_register_auto
            - name: canal.admin.register.name
              valueFrom:
                configMapKeyRef:
                  name: canal-server
                  key: admin_register_name
          resources:
            requests:
              cpu: 250m
              memory: 256Mi
          livenessProbe:
            tcpSocket:
              port: 11112
            initialDelaySeconds: 10
            timeoutSeconds: 5
            periodSeconds: 30
          readinessProbe:
            tcpSocket:
              port: 11112
            initialDelaySeconds: 10
            timeoutSeconds: 5
            periodSeconds: 30
      restartPolicy: Always
      nodeSelector: {}
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
            - weight: 1
              podAffinityTerm:
                labelSelector:
                  matchLabels:
                    app.kubernetes.io/name: canal-server
                namespaces:
                  - iot
                topologyKey: kubernetes.io/hostname
  serviceName: canal-server
  revisionHistoryLimit: 10

---
kind: Service
apiVersion: v1
metadata:
  name: canal-server
  namespace: iot
spec:
  ports:
    - protocol: TCP
      port: 11110
      targetPort: 11110

  type: ClusterIP
  selector:
    app.kubernetes.io/name: canal-server
    app: canal-server
canlal-admin 配置修改
kafka 同步

canal-admin 运行正常之后,就可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456

image-20230317141632175

可以看到默认已经注册了

image-20230317141713415

点击操作—修改主配置-点击保存即可

# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
# 默认是tcp模式,修改为kafka 


######### 		     Kafka 		     #############
##################################################
kafka.bootstrap.servers = 172.19.0.64:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='用户名' password='密码';

kafka.sasl.mechanism=PLAIN
kafka.security.protocol=PLAINTEXT
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
canal 创建instance

配置kafka 同步

## 需要修改的部分,源库的ip数据库地址
canal.instance.master.address=127.0.0.1:3306

# 源库的用户名与密码
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

# 匹配同步的数据库规则
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=


# mq config  
# kafka topic 的名字,可以在kafka创建,与kafka topic对应,如果不填写,会自动创建
canal.mq.topic=example

#备注: 需要在源库中创建用户名与密码并且同时具有同步的权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
kafka 查看消息
image-20230317143006349

也可以查看instance 的日志

rabbitmq 同步

rabbitmq 的k8s 部署

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: rabbitmq
spec:
  replicas: 1
  selector:
    matchLabels:
      app: rabbitmq
  template:
    metadata:
      labels:
        app: rabbitmq 
    spec:
      containers:
      - name: rabbitmq
        image: rabbitmq:3.7.13-management
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 5672
          name: server-port 
        - containerPort: 15672
          name: http-port 
        env:
        - name: RABBITMQ_DEFAULT_VHOST
          value: /iot
        - name: RABBITMQ_DEFAULT_USER
          value: admin
        - name: RABBITMQ_DEFAULT_PASS
          value: bdtp@2023
        volumeMounts:
        - name: persisitent-storage-rabbitmq
          mountPath: /var/lib/rabbitmq
        - name: localtime 
          readOnly: true 
          mountPath: /etc/localtime
      volumes:
      - name: persisitent-storage-rabbitmq
        hostPath:
          path: /data/rabbitmq
      - name: localtime 
        hostPath: 
          type: File 
          path: /etc/localtime 

---
apiVersion: v1
kind: Service
metadata:
  name: rabbitmq-service
  labels:
    app: rabbitmq
spec:
  selector:
    app: rabbitmq
  type: NodePort
  ports:
    - name: server-port
      port: 5672
      targetPort: 5672
    - name: http-port
      port: 15672
      targetPort: 15672
      nodePort: 30351

在rabbitmq 上创建exchange

image-20231113195114242

  1. 创建queues

    image-20231113195210583

  2. queues 绑定exchanges

    image-20231113195334289

自定义Routig key 名字

修改主配置文件canal.properties

canal.serverMode = rabbitmq
##################################################
######### 		    RabbitMQ	     #############
##################################################
rabbitmq.host = rabbitmq-service.iot
rabbitmq.virtual.host = /iot
rabbitmq.exchange = canal-exchanges
rabbitmq.routingKey = canal-routing
rabbitmq.username = admin
rabbitmq.password = bdtp@2023
rabbitmq.deliveryMode = direct
rabbitmq.queue = canal-queues

修改配置文件instance.propertios

canal.instance.master.address=192.168.101.32:32060
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
# table regex
canal.instance.filter.regex=test01.test_ddl   #同步的表
# table black regex
canal.instance.filter.black.regex=

# mq config
canal.mq.topic=canal-routing  # 这里填写的是rabbitmq.routingKey 
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

验证rabbitmq 查看消息

image-20231113201133419

comments powered by Disqus