找回密码
 立即注册
首页 业界区 安全 【实战】一招搞定Shell调度!DolphinScheduler+ProcessB ...

【实战】一招搞定Shell调度!DolphinScheduler+ProcessBuilder超详细教程

訾懵 6 天前
本文将介绍在DolphinScheduler中使用ProcessBuilder执行Shell命令的方法。默认通过BashShellInterceptorBuilder封装Shell脚本并生成执行命令,支持普通模式和sudo模式运行。同时,结合Spring Boot应用示例,展示了如何配置工作目录、合并错误流、监控执行状态,并输出日志信息,从而实现对Shell任务的高效管理和调度。
1、ProcessBuilder DolphinScheduler中的使用

1.1、命令的封装

org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory
  1. public class ShellInterceptorBuilderFactory {
  2.     private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash");
  3.     @SuppressWarnings("unchecked")
  4.     public static IShellInterceptorBuilder newBuilder() {
  5.         // TODO 默认的走的是这个逻辑
  6.         if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) {
  7.             return new BashShellInterceptorBuilder();
  8.         }
  9.         if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) {
  10.             return new ShShellInterceptorBuilder();
  11.         }
  12.         if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) {
  13.             return new CmdShellInterceptorBuilder();
  14.         }
  15.         throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE);
  16.     }
  17. }
复制代码
默认走的是 BashShellInterceptorBuilder
org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder
  1. public class BashShellInterceptorBuilder
  2.         extends
  3.             BaseLinuxShellInterceptorBuilder<BashShellInterceptorBuilder, BashShellInterceptor> {
  4.     @Override
  5.     public BashShellInterceptorBuilder newBuilder() {
  6.         return new BashShellInterceptorBuilder();
  7.     }
  8.     @Override
  9.     public BashShellInterceptor build() throws FileOperateException, IOException {
  10.         // TODO 这里是生成shell脚本的核心点
  11.         generateShellScript();
  12.         List<String> bootstrapCommand = generateBootstrapCommand();
  13.         // TODO 实例化BashShellInterceptor
  14.         return new BashShellInterceptor(bootstrapCommand, shellDirectory);
  15.     }
  16.     // 这个是如果不是sudo的方式,进行命令执行的前缀
  17.     @Override
  18.     protected String shellInterpreter() {
  19.         return "bash";
  20.     }
  21.     @Override
  22.     protected String shellExtension() {
  23.         return ".sh";
  24.     }
  25.     @Override
  26.     protected String shellHeader() {
  27.         return "#!/bin/bash";
  28.     }
  29. }
复制代码
org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder#generateBootstrapCommand
  1. protected List<String> generateBootstrapCommand() {
  2.         if (sudoEnable) {
  3.             // TODO 默认是走这里的,其实就是sudo -u 租户 -i /opt/xx.sh
  4.             return bootstrapCommandInSudoMode();
  5.         }
  6.         // TODO bash /opt/xx.sh
  7.         return bootstrapCommandInNormalMode();
  8.     }
复制代码
bootstrapCommandInSudoMode():
  1. private List<String>
  2. bootstrapCommandInSudoMode() {
  3.         if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE, false)) {
  4.             return bootstrapCommandInResourceLimitMode();
  5.         }
  6.         List<String> bootstrapCommand = new ArrayList<>();
  7.         bootstrapCommand.add("sudo");
  8.         if (StringUtils.isNotBlank(runUser)) {
  9.             bootstrapCommand.add("-u");
  10.             bootstrapCommand.add(runUser);
  11.         }
  12.         bootstrapCommand.add("-i");
  13.         bootstrapCommand.add(shellAbsolutePath().toString());
  14.         return bootstrapCommand;
  15.     }
复制代码
bootstrapCommandInNormalMode():
  1. private List<String> bootstrapCommandInNormalMode() {
  2.         List<String> bootstrapCommand = new ArrayList<>();
  3.         bootstrapCommand.add(shellInterpreter());
  4.         bootstrapCommand.add(shellAbsolutePath().toString());
  5.         return bootstrapCommand;
  6.     }
复制代码
1.2、命令的执行

org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor
  1. public abstract class BaseShellInterceptor implements IShellInterceptor {
  2.     protected final String workingDirectory;
  3.     protected final List<String> executeCommands;
  4.     protected BaseShellInterceptor(List<String> executeCommands, String workingDirectory) {
  5.         this.executeCommands = executeCommands;
  6.         this.workingDirectory = workingDirectory;
  7.     }
  8.     @Override
  9.     public Process execute() throws IOException {
  10.         // init process builder
  11.         ProcessBuilder processBuilder = new ProcessBuilder();
  12.         // setting up a working directory
  13.         // TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的
  14.         processBuilder.directory(new File(workingDirectory));
  15.         // merge error information to standard output stream
  16.         processBuilder.redirectErrorStream(true);
  17.         processBuilder.command(executeCommands);
  18.         log.info("Executing shell command : {}", String.join(" ", executeCommands));
  19.         return processBuilder.start();
  20.     }
  21. }
复制代码
2、最佳实践实例

2.1、pom.xml配置
  1. <dependency>
  2.   <groupId>org.springframework.boot</groupId>
  3.   spring-boot-starter</artifactId>
  4.   <version>2.6.1</version>
  5. </dependency>
复制代码
2.2、pom.xml配置
  1. @SpringBootApplication
  2. public class Application {
  3.     public static void main(String[] args) throws Exception {
  4.         SpringApplication.run(Application.class, args);
  5.         List<String> executeCommands = new ArrayList<>();
  6.         executeCommands.add("sudo");
  7.         executeCommands.add("-u");
  8.         executeCommands.add("qiaozhanwei");
  9.         executeCommands.add("-i");
  10.         executeCommands.add("/opt/test/my.sh");
  11.         ProcessBuilder processBuilder = new ProcessBuilder();
  12.         // setting up a working directory
  13.         // TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的
  14.         processBuilder.directory(new File("/opt/test"));
  15.         // merge error information to standard output stream
  16.         processBuilder.redirectErrorStream(true);
  17.         processBuilder.command(executeCommands);
  18.         Process process = processBuilder.start();
  19.         try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
  20.             String line;
  21.             while ((line = inReader.readLine()) != null) {
  22.                 // TODO 终端日志输出
  23.                 System.out.println(line);
  24.             }
  25.         } catch (Exception e) {
  26.             e.printStackTrace();
  27.         }
  28.         // TODO 等10分钟,如果10分钟不结束,返回且status为false
  29.         boolean status = process.waitFor(10, TimeUnit.MINUTES);
  30.         System.out.println("status ->" + status);
  31.     }
  32. }
复制代码
2.3、日志输出结果
  1.   .   ____          _            __ _ _
  2. /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
  3. ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
  4. \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  5.   '  |____| .__|_| |_|_| |_\__, | / / / /
  6. =========|_|==============|___/=/_/_/_/
  7. :: Spring Boot ::                (v2.6.1)
  8. 2024-06-15 18:33:16.090  INFO 31834 --- [           main] com.journey.test.Application             : Starting Application using Java 1.8.0_401 on 192.168.1.4 with PID 31834 (/Users/qiaozhanwei/IdeaProjects/springboot2/target/classes started by qiaozhanwei in /Users/qiaozhanwei/IdeaProjects/springboot2)
  9. 2024-06-15 18:33:16.091  INFO 31834 --- [           main] com.journey.test.Application             : No active profile set, falling back to default profiles: default
  10. 2024-06-15 18:33:16.244  INFO 31834 --- [           main] com.journey.test.Application             : Started Application in 0.252 seconds (JVM running for 0.42)
  11. Number of Maps  = 1
  12. Samples per Map = 100000
  13. 2024-06-15 18:33:16,790 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  14. Wrote input for Map #0
  15. Starting Job
  16. 2024-06-15 18:33:17,329 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at kvm-10-253-26-85/10.253.26.85:8032
  17. 2024-06-15 18:33:17,586 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/qiaozhanwei/.staging/job_1694766249884_0931
  18. 2024-06-15 18:33:17,837 INFO input.FileInputFormat: Total input files to process : 1
  19. 2024-06-15 18:33:18,024 INFO mapreduce.JobSubmitter: number of splits:1
  20. 2024-06-15 18:33:18,460 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1694766249884_0931
  21. 2024-06-15 18:33:18,460 INFO mapreduce.JobSubmitter: Executing with tokens: []
  22. 2024-06-15 18:33:18,648 INFO conf.Configuration: resource-types.xml not found
  23. 2024-06-15 18:33:18,648 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
  24. 2024-06-15 18:33:18,698 INFO impl.YarnClientImpl: Submitted application application_1694766249884_0931
  25. 2024-06-15 18:33:18,734 INFO mapreduce.Job: The url to track the job: http://kvm-10-253-26-85:8088/proxy/application_1694766249884_0931/
  26. 2024-06-15 18:33:18,734 INFO mapreduce.Job: Running job: job_1694766249884_0931
  27. 2024-06-15 18:33:24,978 INFO mapreduce.Job: Job job_1694766249884_0931 running in uber mode : false
  28. 2024-06-15 18:33:24,978 INFO mapreduce.Job:  map 0% reduce 0%
  29. 2024-06-15 18:33:29,153 INFO mapreduce.Job:  map 100% reduce 0%
  30. 2024-06-15 18:33:34,384 INFO mapreduce.Job:  map 100% reduce 100%
  31. 2024-06-15 18:33:34,455 INFO mapreduce.Job: Job job_1694766249884_0931 completed successfully
  32. 2024-06-15 18:33:34,565 INFO mapreduce.Job: Counters: 54
  33.     File System Counters
  34.         FILE: Number of bytes read=28
  35.         FILE: Number of bytes written=548863
  36.         FILE: Number of read operations=0
  37.         FILE: Number of large read operations=0
  38.         FILE: Number of write operations=0
  39.         HDFS: Number of bytes read=278
  40.         HDFS: Number of bytes written=215
  41.         HDFS: Number of read operations=9
  42.         HDFS: Number of large read operations=0
  43.         HDFS: Number of write operations=3
  44.         HDFS: Number of bytes read erasure-coded=0
  45.     Job Counters
  46.         Launched map tasks=1
  47.         Launched reduce tasks=1
  48.         Data-local map tasks=1
  49.         Total time spent by all maps in occupied slots (ms)=37968
  50.         Total time spent by all reduces in occupied slots (ms)=79360
  51.         Total time spent by all map tasks (ms)=2373
  52.         Total time spent by all reduce tasks (ms)=2480
  53.         Total vcore-milliseconds taken by all map tasks=2373
  54.         Total vcore-milliseconds taken by all reduce tasks=2480
  55.         Total megabyte-milliseconds taken by all map tasks=4859904
  56.         Total megabyte-milliseconds taken by all reduce tasks=10158080
  57.     Map-Reduce Framework
  58.         Map input records=1
  59.         Map output records=2
  60.         Map output bytes=18
  61.         Map output materialized bytes=28
  62.         Input split bytes=160
  63.         Combine input records=0
  64.         Combine output records=0
  65.         Reduce input groups=2
  66.         Reduce shuffle bytes=28
  67.         Reduce input records=2
  68.         Reduce output records=0
  69.         Spilled Records=4
  70.         Shuffled Maps =1
  71.         Failed Shuffles=0
  72.         Merged Map outputs=1
  73.         GC time elapsed (ms)=87
  74.         CPU time spent (ms)=1420
  75.         Physical memory (bytes) snapshot=870387712
  76.         Virtual memory (bytes) snapshot=9336647680
  77.         Total committed heap usage (bytes)=2716860416
  78.         Peak Map Physical memory (bytes)=457416704
  79.         Peak Map Virtual memory (bytes)=3773362176
  80.         Peak Reduce Physical memory (bytes)=412971008
  81.         Peak Reduce Virtual memory (bytes)=5563285504
  82.     Shuffle Errors
  83.         BAD_ID=0
  84.         CONNECTION=0
  85.         IO_ERROR=0
  86.         WRONG_LENGTH=0
  87.         WRONG_MAP=0
  88.         WRONG_REDUCE=0
  89.     File Input Format Counters
  90.         Bytes Read=118
  91.     File Output Format Counters
  92.         Bytes Written=97
  93. Job Finished in 17.292 seconds
  94. Estimated value of Pi is 3.14120000000000000000
  95. status ->true
  96. Process finished with exit code 0
复制代码
转载自Journey
原文链接:https://segmentfault.com/a/1190000044966157
本文由 白鲸开源 提供发布支持!

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册