找回密码
 立即注册
首页 业界区 安全 HTTP接口数据也能定时同步入湖?用DolphinScheduler ...

HTTP接口数据也能定时同步入湖?用DolphinScheduler×SeaTunnel快速搞定!

鞠彗云 2025-5-31 23:32:29
背景与目标

我们之前曾评估使用过SeaTunnel做CDC入湖验证:SeaTunnel-CDC入湖实践,这些场景都是能直连数据库的场景,业务需求中经常会出现无法直连数据库做CDC进行数据同步的场景,而这些场景就需要使用API进行数据对接,用Apache DolphinScheduler定时同步数据。
举个实际中的例子:

  • ERP(SAP)的库存数据进行同步入湖仓做库存分析
同时,本次目标希望其他同事能依样画葫芦,在以后的对接http接口到湖仓的时候能够自行完成,而非每遇到一个对接需求,就需要通过代码方式进行对接。
准备工作


  • seatunnel 2.3.10
首先,您需要在${SEATUNNEL_HOME}/config/plugin_config文件中加入连接器名称,然后,执行命令来安装连接器,确认连接器在${SEATUNNEL_HOME}/connectors/目录下即可。
本例中我们会用到:connector-jdbc、connector-paimon
写入StarRocks也可以使用connector-starrocks,本例中的场景比较适合用connector-jdbc,所以使用connector-jdbc。
  1. # 配置连接器名称
  2. --connectors-v2--
  3. connector-jdbc
  4. connector-starrocks
  5. connector-paimon
  6. --end--
复制代码
  1. # 安装连接器
  2. sh bin/install-plugin.sh 2.3.10
复制代码
SeaTunnel任务

我们先至少保证能在本地完成SeaTunnel任务,再完成对Apache DolphinScheduler的对接。

  • http to starRocks
    example/http2starrocks
  1. env {
  2.   parallelism = 1
  3.   job.mode = "BATCH"
  4. }
  5. source {
  6.   Http {
  7.     plugin_output = "stock"
  8.     url = "https://ip/http/prd/query_sap_stock"
  9.     method = "POST"
  10.     headers {
  11.       Authorization = "Basic XXX"
  12.       Content-Type = "application/json"
  13.     }
  14.     body = """{"IT_WERKS": [{"VALUE": "1080"}]}"""
  15.     format = "json"
  16.     content_field = "$.ET_RETURN.*"
  17.     schema {
  18.       fields {
  19.         MATNR = "string"
  20.         MAKTX = "string"
  21.         WERKS = "string"
  22.         NAME1 = "string"
  23.         LGORT = "string"
  24.         LGOBE = "string"
  25.         CHARG = "string"
  26.         MEINS = "string"
  27.         LABST = "double"
  28.         UMLME = "double"
  29.         INSME = "double"
  30.         EINME = "double"
  31.         SPEME = "double"
  32.         RETME = "double"
  33.       }
  34.     }
  35.   }
  36. }
  37. # 此转换操作主要用于字段从命名等方便用途
  38. transform {
  39.   Sql {
  40.     plugin_input = "stock"
  41.     plugin_output = "stock-tf-out"
  42.     query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock"
  43.   }
  44. }
  45. # 连接starRocks 进行数据分区覆写,本例适用starRocks建表,按照分区insert overwrite 覆写
  46. sink {
  47.     jdbc {
  48.         plugin_input = "stock-tf-out"
  49.         url = "jdbc:mysql://XXX:9030/scm?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
  50.         driver = "com.mysql.cj.jdbc.Driver"
  51.         user = "lab"
  52.         password = "XXX"
  53.         compatible_mode="starrocks"
  54.         query = """insert overwrite ods_sap_stock PARTITION (WERKS='1080') (MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"""
  55.         }
  56. }
  57. # connector-starrocks进行对接 (未看到支持sql语句进行数据insert overwrite,本例子场景不适合),比较适合表数据全部删除重建场景
  58. // sink {
  59. //   StarRocks {
  60. //     plugin_input = "stock-tf-out"
  61. //     nodeUrls = ["ip:8030"]
  62. //     base-url = "jdbc:mysql://ip:9030/"
  63. //     username = "lab"
  64. //     password = "XXX"
  65. //     database = "scm"
  66. //     table = "ods_sap_stock"
  67. //     batch_max_rows = 1000
  68. //     data_save_mode="DROP_DATA"
  69. //     starrocks.config = {
  70. //       format = "JSON"
  71. //       strip_outer_array = true
  72. //     }
  73. //     schema_save_mode = "RECREATE_SCHEMA"
  74. //     save_mode_create_template="""
  75. //       CREATE TABLE IF NOT EXISTS `scm`.`ods_sap_stock` (
  76. //         MATNR STRING COMMENT '物料',
  77. //         WERKS STRING COMMENT '工厂',
  78. //         LGORT STRING COMMENT '库存地点',
  79. //         MAKTX STRING COMMENT '物料描述',
  80. //         NAME1 STRING COMMENT '工厂名称',
  81. //         LGOBE STRING COMMENT '地点描述',
  82. //         CHARG STRING COMMENT '批次编号',
  83. //         MEINS STRING COMMENT '单位',
  84. //         LABST DOUBLE COMMENT '非限制使用库存',
  85. //         UMLME DOUBLE COMMENT '在途库存',
  86. //         INSME DOUBLE COMMENT '质检库存',
  87. //         EINME DOUBLE COMMENT '受限制使用的库存',
  88. //         SPEME DOUBLE COMMENT '已冻结的库存',
  89. //         RETME DOUBLE COMMENT '退货'
  90. //       ) ENGINE=OLAP
  91. //       PRIMARY KEY ( MATNR,WERKS,LGORT)
  92. //       COMMENT 'sap库存'
  93. //       DISTRIBUTED BY HASH (WERKS) PROPERTIES (
  94. //       "replication_num" = "1"
  95. //       )
  96. //     """
  97. //   }
  98. // }
复制代码

  • http to paimon
    example/http2paimon
  1. env {
  2.   parallelism = 1
  3.   job.mode = "BATCH"
  4. }
  5. source {
  6.   Http {
  7.     plugin_output = "stock"
  8.     url = "https://ip/http/prd/query_sap_stock"
  9.     method = "POST"
  10.     headers {
  11.       Authorization = "Basic XXX"
  12.       Content-Type = "application/json"
  13.     }
  14.     body = """{"IT_WERKS": [{"VALUE": "1080"}]}"""
  15.     format = "json"
  16.     content_field = "$.ET_RETURN.*"
  17.     schema {
  18.       fields {
  19.         MATNR = "string"
  20.         MAKTX = "string"
  21.         WERKS = "string"
  22.         NAME1 = "string"
  23.         LGORT = "string"
  24.         LGOBE = "string"
  25.         CHARG = "string"
  26.         MEINS = "string"
  27.         LABST = "double"
  28.         UMLME = "double"
  29.         INSME = "double"
  30.         EINME = "double"
  31.         SPEME = "double"
  32.         RETME = "double"
  33.       }
  34.     }
  35.   }
  36. }
  37. # 此转换操作主要用于字段从命名等方便用途
  38. transform {
  39.   Sql {
  40.     plugin_input = "stock"
  41.     plugin_output = "stock-tf-out"
  42.     query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock"
  43.   }
  44. }
  45. # 连接paimon进行数据同步,paimon 暂时 未看到有支持 insert overwrite 分区覆写,此例仅作为参考,不适用本此例子需求
  46. sink {
  47.   Paimon {
  48.     warehouse = "s3a://test/"
  49.     database = "sap"
  50.     table = "ods_sap_stock"
  51.     paimon.hadoop.conf = {
  52.         fs.s3a.access-key=XXX
  53.         fs.s3a.secret-key=XXX
  54.         fs.s3a.endpoint="http://minio:9000"
  55.         fs.s3a.path.style.access=true
  56.         fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
  57.     }
  58.   }
  59. }
复制代码
DolphinScheduler集成SeaTunnel


  • 制作worker镜像
  1. FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-worker:3.2.2
  2. RUN mkdir /opt/seatunnel
  3. RUN mkdir /opt/seatunnel/apache-seatunnel-2.3.10
  4. # 容器集成seatunnel
  5. COPY apache-seatunnel-2.3.10/ /opt/seatunnel/apache-seatunnel-2.3.10/
复制代码
打包镜像,推送到镜像仓库
  1. docker build --platform=linux/amd64 -t apache/dolphinscheduler-worker:3.2.2-seatunnel .
复制代码

  • 使用新镜像部署一个worker,此处修改docker-compose.yaml,增加一个dolphinscheduler-worker-seatunnel节点。
  1. ...
  2.   dolphinscheduler-worker-seatunnel:
  3.     image: xxx/dolphinscheduler-worker:3.2.2-seatunnel
  4.     profiles: ["all"]
  5.     env_file: .env
  6.     healthcheck:
  7.       test: [ "CMD", "curl", "http://localhost:1235/actuator/health" ]
  8.       interval: 30s
  9.       timeout: 5s
  10.       retries: 3
  11.     depends_on:
  12.       dolphinscheduler-zookeeper:
  13.         condition: service_healthy
  14.     volumes:
  15.       - ./dolphinscheduler-worker-seatunnel-data:/tmp/dolphinscheduler
  16.       - ./dolphinscheduler-logs:/opt/dolphinscheduler/logs
  17.       - ./dolphinscheduler-shared-local:/opt/soft
  18.       - ./dolphinscheduler-resource-local:/dolphinscheduler
  19.     networks:
  20.       dolphinscheduler:
  21.         ipv4_address: 172.15.0.18
  22. ...
复制代码

  • DolphinScheduler配置SeaTunnel分组及环境配置

    • 安全中心-Worker分组管理,创建一个这个节点ip的分组,用于以后需要seatunnel的任务跑该分组
      1.png

    • 环境管理-创建环境,增加一个用于执行seatunnel的环境,同时需要绑定Worker分组为上一步创建的seatunnel分组
      2.png

    • 创建工作流定义,把上面的seatunnel任务配置填写上
      3.png

    • 运行时候,选择seatunnel的worker分组和环境即可跑在这个集成了seatunnel的环境上
      4.png


转载自俊瑶先森
原文链接:https://junyao.tech/posts/9c6867c5.html
本文由 白鲸开源 提供发布支持!

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册