【问题标题】:How to create a Kafka Connect connector at worker creation time?如何在工作人员创建时创建 Kafka Connect 连接器?
【发布时间】:2020-07-01 23:37:11
【问题描述】:

假设我有一个使用 Docker 从 confluentinc/cp-kafka-connect 映像构建的 Kafka Connect 工作程序,该工作程序部署到服务器并启动一个工作程序。现在大多数时候,连接器已经存在,因为我使用 REST API 调用在端口 8083 上对 POST 进行了创建它。但是我将如何在工作人员启动时通过脚本创建我的连接器(如果它不存在) ?我可以在它启动后以某种方式让我的工作人员运行吗?

【问题讨论】:

    标签: docker apache-kafka apache-kafka-connect


    【解决方案1】:

    它需要一个重载的command

    原始问题:https://github.com/confluentinc/cp-docker-images/issues/467

    解决方案

      volumes:
        - $PWD/scripts:/scripts  # TODO: Create this folder ahead of time, on your host
      command: 
        - bash 
        - -c 
        - |
          /etc/confluent/docker/run & 
          echo "Waiting for Kafka Connect to start listening on kafka-connect ⏳"
          while [ $$(curl -s -o /dev/null -w %{http_code} http://kafka-connect:8083/connectors) -eq 000 ] ; do 
            echo -e $$(date) " Kafka Connect listener HTTP state: " $$(curl -s -o /dev/null -w %{http_code} http://kafka-connect:8083/connectors) " (waiting for 200)"
            sleep 5 
          done
          nc -vz kafka-connect 8083
          echo -e "\n--\n+> Creating Kafka Connector(s)"
          /scripts/create-connectors.sh  # Note: This script is stored externally from container
          sleep infinity
    

    【讨论】:

    • 其他解决方案是在任何地方启动connect-distributed,配置内部主题,发布连接器(保存到配置主题),然后启动N个容器,它们都选择相同的配置
    • 我不熟悉这个示例文件是什么。起初我以为是 YAML,但它看起来不像 helm 生成的我的 YAML。我认识那里的 shell 命令,但显然我的 Docker/Kubernetes 知识存在很大差距;你能指出来让我去研究吗?
    • 这是 yaml... 这是 docker compose,因为您从未说明以哪种方式运行容器。 Confluent Helm Charts 在部署中也应该有一个命令部分
    【解决方案2】:

    正如 cricket_007 所说,您可以将其嵌入到命令中并调用已安装的脚本,或者您可以将其全部内联,like this example。如果你这样做,请注意在命令部分,$ 被替换为$$ 以避免错误Invalid interpolation format for "command" option

      kafka-connect-01:
        image: confluentinc/cp-kafka-connect:5.4.0
    […]
        command: 
          - bash 
          - -c 
          - |
    […]
            echo "Launching Kafka Connect worker"
            /etc/confluent/docker/run & 
            #
            echo "Waiting for Kafka Connect to start listening on localhost ⏳"
            while : ; do
              curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
              echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
              if [ $$curl_status -eq 200 ] ; then
                break
              fi
              sleep 5 
            done
            echo -e "\n--\n+> Creating Data Generator source"
            curl -s -X PUT -H  "Content-Type:application/json" http://localhost:8083/connectors/source-datagen-01/config \
                -d '{
                "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
                "key.converter": "org.apache.kafka.connect.storage.StringConverter",
                "kafka.topic": "ratings",
                "max.interval":750,
                "quickstart": "ratings",
                "tasks.max": 1
            }'
            sleep infinity
    

    【讨论】:

    • 我不熟悉这个示例文件是什么。起初我以为是 YAML,但它看起来不像 helm 生成的我的 YAML。我认识那里的 shell 命令,但显然我的 Docker/Kubernetes 知识存在很大差距;你能指出来让我去研究吗?
    • 啊抱歉 - 是的,这是来自 Docker Compose 文件的 YAML 提取。不幸的是,我不知道 k8s/helm 所以我不会翻译,但你基本上需要在运行时将 command 部分设置为你的 Docker 容器的命令。
    • @Pat Confluent Helm Charts 没有可用于连接的命令,只有 JMX 导出器。来源github.com/confluentinc/cp-helm-charts/blob/master/charts/…
    【解决方案3】:

    如果您使用像 Ansible 这样的工具进行自动化,这个配置可能有用:

    - hosts: kafka-connect-docker
      name: deploy kafka connect cluster
      become: yes
      gather_facts: yes
      serial: '{{ serial|default(1) }}'
      tasks:
        # it's not fully working example
        ...
    
        - name: run container
          notify: wait ports
          docker_container:
            name: kafka-connect
            image: "{{ docker_registry }}/kafka-connect:2.4.0-1.3.0"
            entrypoint: ["sh", "-c", "'exec /opt/kafka/bin/connect-distributed.sh /etc/kafka-connect/connect-distributed.properties >> /var/log/kafka-connect/stderrout.log 2>&1'"]
            restart_policy: always
            network_mode: host
            state: started
    
        - name: call wait ports
          command: /bin/true
          notify: wait ports
    
      handlers:
        - name: restart container
          shell: docker restart kafka-connect
          notify: wait ports
    
        - name: wait ports
          wait_for: port=10900 timeout=300 host=127.0.0.1
          changed_when: True
          notify: check cluster status
    
        - name: check cluster status
          uri: 
            url: "http://127.0.0.1:10900/connectors"
            status_code: 200
          register: cluster_status_json_response
          until: cluster_status_json_response.status == 200
          retries: 60
          delay: 5
    
    
    - hosts: kafka-connect-docker[0]
      name: deploy connectors configs
      become: yes
      tasks:
        - name: restore connectors configs
          uri:
            url: "http://127.0.0.1:10900/connectors/{{ item }}/config"
            method: PUT
            return_content: yes
            body_format: json
            headers:
              Accept: "application/json"
              Content-Type: "application/json"
            body: "{{ lookup('template', 'roles/kafka-connect/templates/etc/kafka-connect/tasks/' + item + '.json') }}"
            status_code: 200, 201
            timeout: 60
          with_items: "{{ connector_configs }}"
    

    【讨论】:

      猜你喜欢
      • 2019-02-02
      • 2020-06-05
      • 2021-11-11
      • 2017-06-02
      • 1970-01-01
      • 2018-09-19
      • 2017-04-20
      • 2019-09-06
      • 2023-03-24
      相关资源
      最近更新 更多