本文将介绍在DolphinScheduler中使用ProcessBuilder执行Shell命令的方法。默认通过BashShellInterceptorBuilder封装Shell脚本并生成执行命令,支持普通模式和sudo模式运行。同时,结合Spring Boot应用示例,展示了如何配置工作目录、合并错误流、监控执行状态,并输出日志信息,从而实现对Shell任务的高效管理和调度。
1、ProcessBuilder DolphinScheduler中的使用
1.1、命令的封装
org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory- public class ShellInterceptorBuilderFactory {
- private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash");
- @SuppressWarnings("unchecked")
- public static IShellInterceptorBuilder newBuilder() {
- // TODO 默认的走的是这个逻辑
- if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) {
- return new BashShellInterceptorBuilder();
- }
- if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) {
- return new ShShellInterceptorBuilder();
- }
- if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) {
- return new CmdShellInterceptorBuilder();
- }
- throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE);
- }
- }
复制代码 默认走的是 BashShellInterceptorBuilder。
org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder- public class BashShellInterceptorBuilder
- extends
- BaseLinuxShellInterceptorBuilder<BashShellInterceptorBuilder, BashShellInterceptor> {
- @Override
- public BashShellInterceptorBuilder newBuilder() {
- return new BashShellInterceptorBuilder();
- }
- @Override
- public BashShellInterceptor build() throws FileOperateException, IOException {
- // TODO 这里是生成shell脚本的核心点
- generateShellScript();
- List<String> bootstrapCommand = generateBootstrapCommand();
- // TODO 实例化BashShellInterceptor
- return new BashShellInterceptor(bootstrapCommand, shellDirectory);
- }
- // 这个是如果不是sudo的方式,进行命令执行的前缀
- @Override
- protected String shellInterpreter() {
- return "bash";
- }
- @Override
- protected String shellExtension() {
- return ".sh";
- }
- @Override
- protected String shellHeader() {
- return "#!/bin/bash";
- }
- }
复制代码 org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder#generateBootstrapCommand- protected List<String> generateBootstrapCommand() {
- if (sudoEnable) {
- // TODO 默认是走这里的,其实就是sudo -u 租户 -i /opt/xx.sh
- return bootstrapCommandInSudoMode();
- }
- // TODO bash /opt/xx.sh
- return bootstrapCommandInNormalMode();
- }
复制代码 bootstrapCommandInSudoMode():- private List<String>
- bootstrapCommandInSudoMode() {
- if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE, false)) {
- return bootstrapCommandInResourceLimitMode();
- }
- List<String> bootstrapCommand = new ArrayList<>();
- bootstrapCommand.add("sudo");
- if (StringUtils.isNotBlank(runUser)) {
- bootstrapCommand.add("-u");
- bootstrapCommand.add(runUser);
- }
- bootstrapCommand.add("-i");
- bootstrapCommand.add(shellAbsolutePath().toString());
- return bootstrapCommand;
- }
复制代码 bootstrapCommandInNormalMode():- private List<String> bootstrapCommandInNormalMode() {
- List<String> bootstrapCommand = new ArrayList<>();
- bootstrapCommand.add(shellInterpreter());
- bootstrapCommand.add(shellAbsolutePath().toString());
- return bootstrapCommand;
- }
复制代码 1.2、命令的执行
org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor- public abstract class BaseShellInterceptor implements IShellInterceptor {
- protected final String workingDirectory;
- protected final List<String> executeCommands;
- protected BaseShellInterceptor(List<String> executeCommands, String workingDirectory) {
- this.executeCommands = executeCommands;
- this.workingDirectory = workingDirectory;
- }
- @Override
- public Process execute() throws IOException {
- // init process builder
- ProcessBuilder processBuilder = new ProcessBuilder();
- // setting up a working directory
- // TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的
- processBuilder.directory(new File(workingDirectory));
- // merge error information to standard output stream
- processBuilder.redirectErrorStream(true);
- processBuilder.command(executeCommands);
- log.info("Executing shell command : {}", String.join(" ", executeCommands));
- return processBuilder.start();
- }
- }
复制代码 2、最佳实践实例
2.1、pom.xml配置
- <dependency>
- <groupId>org.springframework.boot</groupId>
- spring-boot-starter</artifactId>
- <version>2.6.1</version>
- </dependency>
复制代码 2.2、pom.xml配置
- @SpringBootApplication
- public class Application {
- public static void main(String[] args) throws Exception {
- SpringApplication.run(Application.class, args);
- List<String> executeCommands = new ArrayList<>();
- executeCommands.add("sudo");
- executeCommands.add("-u");
- executeCommands.add("qiaozhanwei");
- executeCommands.add("-i");
- executeCommands.add("/opt/test/my.sh");
- ProcessBuilder processBuilder = new ProcessBuilder();
- // setting up a working directory
- // TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的
- processBuilder.directory(new File("/opt/test"));
- // merge error information to standard output stream
- processBuilder.redirectErrorStream(true);
- processBuilder.command(executeCommands);
- Process process = processBuilder.start();
- try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
- String line;
- while ((line = inReader.readLine()) != null) {
- // TODO 终端日志输出
- System.out.println(line);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- // TODO 等10分钟,如果10分钟不结束,返回且status为false
- boolean status = process.waitFor(10, TimeUnit.MINUTES);
- System.out.println("status ->" + status);
- }
- }
复制代码 2.3、日志输出结果
- . ____ _ __ _ _
- /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
- ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
- \\/ ___)| |_)| | | | | || (_| | ) ) ) )
- ' |____| .__|_| |_|_| |_\__, | / / / /
- =========|_|==============|___/=/_/_/_/
- :: Spring Boot :: (v2.6.1)
- 2024-06-15 18:33:16.090 INFO 31834 --- [ main] com.journey.test.Application : Starting Application using Java 1.8.0_401 on 192.168.1.4 with PID 31834 (/Users/qiaozhanwei/IdeaProjects/springboot2/target/classes started by qiaozhanwei in /Users/qiaozhanwei/IdeaProjects/springboot2)
- 2024-06-15 18:33:16.091 INFO 31834 --- [ main] com.journey.test.Application : No active profile set, falling back to default profiles: default
- 2024-06-15 18:33:16.244 INFO 31834 --- [ main] com.journey.test.Application : Started Application in 0.252 seconds (JVM running for 0.42)
- Number of Maps = 1
- Samples per Map = 100000
- 2024-06-15 18:33:16,790 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- Wrote input for Map #0
- Starting Job
- 2024-06-15 18:33:17,329 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at kvm-10-253-26-85/10.253.26.85:8032
- 2024-06-15 18:33:17,586 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/qiaozhanwei/.staging/job_1694766249884_0931
- 2024-06-15 18:33:17,837 INFO input.FileInputFormat: Total input files to process : 1
- 2024-06-15 18:33:18,024 INFO mapreduce.JobSubmitter: number of splits:1
- 2024-06-15 18:33:18,460 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1694766249884_0931
- 2024-06-15 18:33:18,460 INFO mapreduce.JobSubmitter: Executing with tokens: []
- 2024-06-15 18:33:18,648 INFO conf.Configuration: resource-types.xml not found
- 2024-06-15 18:33:18,648 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
- 2024-06-15 18:33:18,698 INFO impl.YarnClientImpl: Submitted application application_1694766249884_0931
- 2024-06-15 18:33:18,734 INFO mapreduce.Job: The url to track the job: http://kvm-10-253-26-85:8088/proxy/application_1694766249884_0931/
- 2024-06-15 18:33:18,734 INFO mapreduce.Job: Running job: job_1694766249884_0931
- 2024-06-15 18:33:24,978 INFO mapreduce.Job: Job job_1694766249884_0931 running in uber mode : false
- 2024-06-15 18:33:24,978 INFO mapreduce.Job: map 0% reduce 0%
- 2024-06-15 18:33:29,153 INFO mapreduce.Job: map 100% reduce 0%
- 2024-06-15 18:33:34,384 INFO mapreduce.Job: map 100% reduce 100%
- 2024-06-15 18:33:34,455 INFO mapreduce.Job: Job job_1694766249884_0931 completed successfully
- 2024-06-15 18:33:34,565 INFO mapreduce.Job: Counters: 54
- File System Counters
- FILE: Number of bytes read=28
- FILE: Number of bytes written=548863
- FILE: Number of read operations=0
- FILE: Number of large read operations=0
- FILE: Number of write operations=0
- HDFS: Number of bytes read=278
- HDFS: Number of bytes written=215
- HDFS: Number of read operations=9
- HDFS: Number of large read operations=0
- HDFS: Number of write operations=3
- HDFS: Number of bytes read erasure-coded=0
- Job Counters
- Launched map tasks=1
- Launched reduce tasks=1
- Data-local map tasks=1
- Total time spent by all maps in occupied slots (ms)=37968
- Total time spent by all reduces in occupied slots (ms)=79360
- Total time spent by all map tasks (ms)=2373
- Total time spent by all reduce tasks (ms)=2480
- Total vcore-milliseconds taken by all map tasks=2373
- Total vcore-milliseconds taken by all reduce tasks=2480
- Total megabyte-milliseconds taken by all map tasks=4859904
- Total megabyte-milliseconds taken by all reduce tasks=10158080
- Map-Reduce Framework
- Map input records=1
- Map output records=2
- Map output bytes=18
- Map output materialized bytes=28
- Input split bytes=160
- Combine input records=0
- Combine output records=0
- Reduce input groups=2
- Reduce shuffle bytes=28
- Reduce input records=2
- Reduce output records=0
- Spilled Records=4
- Shuffled Maps =1
- Failed Shuffles=0
- Merged Map outputs=1
- GC time elapsed (ms)=87
- CPU time spent (ms)=1420
- Physical memory (bytes) snapshot=870387712
- Virtual memory (bytes) snapshot=9336647680
- Total committed heap usage (bytes)=2716860416
- Peak Map Physical memory (bytes)=457416704
- Peak Map Virtual memory (bytes)=3773362176
- Peak Reduce Physical memory (bytes)=412971008
- Peak Reduce Virtual memory (bytes)=5563285504
- Shuffle Errors
- BAD_ID=0
- CONNECTION=0
- IO_ERROR=0
- WRONG_LENGTH=0
- WRONG_MAP=0
- WRONG_REDUCE=0
- File Input Format Counters
- Bytes Read=118
- File Output Format Counters
- Bytes Written=97
- Job Finished in 17.292 seconds
- Estimated value of Pi is 3.14120000000000000000
- status ->true
- Process finished with exit code 0
复制代码 转载自Journey
原文链接:https://segmentfault.com/a/1190000044966157
本文由 白鲸开源 提供发布支持!
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |