找回密码
 立即注册
首页 业界区 安全 MySQL 数据同步至 S3file,并接入 Hive 访问:SeaTunnel ...

MySQL 数据同步至 S3file,并接入 Hive 访问:SeaTunnel 实践指南

零幸 2025-7-17 16:09:48
作者 | 番兄
如何借助 Apache SeaTunnel 将 MySQL 数据高效同步至 S3file?本文详述的步骤已全部通过测试验证,适用于构建基于对象存储的数据中台场景,具备部署灵活、扩展性强等优势,对有 MySQL 到 S3 数据集成需求的用户具有较高的参考价值,点赞、收藏学习吧!
第一步:创建Hive表
  1. CREATE EXTERNAL TABLE ods_ekp.`ods_sys_notify_todo_bak` (
  2.   `fd_id` STRING,
  3.   `fd_app_name` STRING,
  4.   `fd_model_name` STRING,
  5.   `fd_model_id` STRING,
  6.   `fd_key` STRING,
  7.   `fd_parameter1` STRING,
  8.   `fd_parameter2` STRING,
  9.   `fd_create_time` TIMESTAMP,
  10.   `fd_subject` STRING,
  11.   `fd_type` INT,
  12.   `fd_link` STRING,
  13.   `fd_mobile_link` STRING,
  14.   `fd_pad_link` STRING,
  15.   `fd_bundle` STRING,
  16.   `fd_replace_text` STRING,
  17.   `fd_md5` STRING,
  18.   `fd_del_flag` STRING,
  19.   `fd_level` INT,
  20.   `doc_creator_id` STRING,
  21.   `fd_extend_content` STRING,
  22.   `fd_lang` STRING,
  23.   `fd_cate_name` STRING,
  24.   `fd_cate_id` STRING,
  25.   `fd_template_name` STRING,
  26.   `fd_template_id` STRING,
  27.   `fd_hierarchy_id` STRING
  28. )
  29. COMMENT 'sys_notify_todo_bak data'
  30. PARTITIONED BY (
  31.   `dctime` STRING COMMENT '分区年月日'
  32. )
  33. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'
  34. STORED AS PARQUET
  35. LOCATION 's3a://seatunnel/doris/warehouse/ods_ekp/ods_sys_notify_todo_bak'
  36. TBLPROPERTIES (
  37.   'parquet.compression'='ZSTD'
  38. );
复制代码
注意:

  • ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' 这个分隔符设置需要在后面SeaTunnel里面配置一样的,不然格式错误;
  • 'parquet.compression'='ZSTD' 压缩算法也是需要在后面SeaTunnel里面配置一样的;
  • STORED AS PARQUET parquet文件格式,也是需要在后面SeaTunnel里面配置一样的.
是用之前把注释删除
  1. env {
  2.   job.mode = "BATCH"
  3.   parallelism = 2
  4. }
  5. source {
  6.     Jdbc {
  7.         url = "jdbc:mysql://[服务器ip]:3306/[数据库]?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
  8.                 driver = "com.mysql.cj.jdbc.Driver"
  9.                 user = "[账户]"
  10.                 password = "[密码]",
  11.         # dctime要转换成字符串,因为hive建表的时候,这个字段是字符串;分区字段也要加到查询里面,后面SeaTunnel sink的时候会自动处理
  12.                 query = "select fd_id, fd_app_name, fd_model_name, fd_model_id, fd_key, fd_parameter1, fd_parameter2, fd_create_time, fd_subject, fd_type, fd_link, fd_mobile_link, fd_pad_link, fd_bundle, fd_replace_text, fd_md5, fd_del_flag, fd_level, doc_creator_id, fd_extend_content, fd_lang, fd_cate_name, fd_cate_id, fd_template_name, fd_template_id, fd_hierarchy_id, cast(date_format(fd_create_time, '%Y-%m-%d') as char) as dctime from sys_notify_todo_bak
  13.           }
  14. }
  15. transform {
  16. }
  17. sink {
  18.     S3File {
  19.       bucket = "s3a://seatunnel"
  20.       fs.s3a.endpoint = "[minio服务器域名/ip]:9000"
  21.       access_key = "[账户]"
  22.       secret_key = "[密码]"
  23.       fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
  24.       # 目录地址
  25.       path = "/doris/warehouse/ods_ekp/ods_sys_notify_todo_bak"
  26.       tmp_path = "/data/tmp/seatunnel"
  27.       # 必须填写的不然会出现问题,因为本人的minio没有做ssl处理,所以要这么设置
  28.       hadoop_s3_properties {
  29.         "fs.s3a.connection.ssl.enabled" = "false"
  30.         "fs.s3a.path.style.access" = "true"
  31.       }
  32.       # parquet文件格式
  33.       file_format_type = "parquet"
  34.       # 必须用\\代表\
  35.       field_delimiter = "\\001"
  36.       # parquet格式必须要加,否则会出问题
  37.       parquet_avro_write_timestamp_as_int96 = true
  38.       # 压缩算法
  39.       compress_codec = "zstd"
  40.       have_partition = true
  41.       partition_by = ["dctime"]
  42.       partition_dir_expression = "${k0}=${v0}"
  43.       is_partition_field_write_in_file = false
  44.       schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
  45.       data_save_mode="APPEND_DATA"
  46.       custom_filename = true
  47.       file_name_expression = "${transactionId}_${now}"
  48.       filename_time_format = "yyyy.MM.dd"
  49.   }
  50. }
复制代码
第二步:执行SeaTunnel同步,并在Hive查询里执行下列操作
  1. -- 刷新物理目录分析
  2. MSCK REPAIR TABLE ods_ekp.ods_sys_notify_todo_bak;
  3. -- 查询hive表确认是否有数据
  4. SELECT * from ods_ekp.ods_sys_notify_todo_bak limit 100;
复制代码
第三步:创建Doris Hive catalog外部库
  1. CREATE CATALOG hive PROPERTIES (
  2.     'type'='hms',
  3.     'hive.metastore.uris' = 'thrift://[hive metastore server的ip]:9083',
  4.     "s3.endpoint" = "http://[minio服务器域名/ip]:9000",
  5.     "s3.region" = "us-east-1",
  6.     "s3.access_key" = "[账户]",
  7.     "s3.secret_key" = "[密码]",
  8.     "s3.connection.ssl.enabled" = "false",
  9.     "use_path_style" = "true",
  10.     "hive.version" = '2.1.1'
  11. );
  12. REFRESH CATALOG hive;
  13. show databases from hive;
  14. SELECT * from hive.ods_ekp.ods_sys_notify_todo_bak limit 100
复制代码
说明:

  • 因为本人用的CDH6.3.2版本,Hive是2.1.1版本,所以建立catalog的时候,需要指定"hive.version" = '2.1.1'。
  • 因为本人设置的minio没有ssl,所以配置的时候需要加上"s3.connection.ssl.enabled" = "false"。
  • Minio用的是path风格,所以需要配置"use_path_style" = "true"。
  • SeaTunnel版本: 2.3.11
  • Doris版本:2.0.15

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册