一,最近在做平台,就是前后端分离的项目,简单的说就是对各种组件整合一下子,所以呢,提交任务啥的都在平台上搞了。
二,这里实现的功能很简单吧。就是代码模式,执行任务就可以kill掉yarn上的Flink任务。并且能自动生成savapoint
三,我们需要写入的参数是:
1)yarn 任务id
String appId = "application_1600222031782_0023";
2)Flink任务的jobId
String jobid = "c4d7e2ff6a35d402eaf54b9f9ca0f6c6";
3)需要savapoint地址
String savePoint = "hdfs://dev-ct6-dc-master01:8020/flink-savepoints5";
pom依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
如果不成功,执行任务的时候加上hadoop_home的环境变量(下面只是参考)
四,代码
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.util.FlinkException;
import org.apache.flink.yarn.YarnClusterClientFactory;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class StopYarnJob {
public static void main(String[] args) throws FlinkException, CliArgsException, ExecutionException, InterruptedException {
String hadoop_home = System.getProperty("HADOOP_HOME");
System.out.println("hadoop_home = " + hadoop_home);
String configurationDirectory = "G:/flink_working_tools/yarn-conf";
String appId = "application_1600222031782_0023";
String jobid = "c4d7e2ff6a35d402eaf54b9f9ca0f6c6";
String savePoint = "hdfs://dev-ct6-dc-master01:8020/flink-savepoints5";
//获取flink的配置
Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
configurationDirectory);
// Configuration flinkConfiguration = new Configuration();
flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, appId);
YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
ApplicationId applicationId = clusterClientFactory.getClusterId(flinkConfiguration);
if (applicationId == null) {
throw new FlinkException(
"No cluster id was specified. Please specify a cluster to which you would like to connect.");
}
YarnClusterDescriptor clusterDescriptor = clusterClientFactory
.createClusterDescriptor(
flinkConfiguration);
ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(
applicationId).getClusterClient();
JobID jobID = parseJobId(jobid);
CompletableFuture<String> completableFuture = clusterClient.stopWithSavepoint(
jobID,
true,
savePoint);
String savepoint = completableFuture.get();
System.out.println(savepoint);
}
private static JobID parseJobId(String jobIdString) throws CliArgsException {
if (jobIdString == null) {
throw new CliArgsException("Missing JobId");
}
final JobID jobId;
try {
jobId = JobID.fromHexString(jobIdString);
} catch (IllegalArgumentException e) {
throw new CliArgsException(e.getMessage());
}
return jobId;
}
}
五,测试效果
1)我们现实拿example的案例代码,打包提交到集群
public class SocketWindowWordCount { private static final Logger logger = Logger.getLogger(SocketWindowWordCount.class); public static void main(String[] args) throws Exception { // the host and the port to connect to final String hostname; final int port; try { final ParameterTool params = ParameterTool.fromArgs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.getInt("port"); } catch (Exception e) { System.err.println("No port specified. Please run 'SocketWindowWordCount " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server"); System.err.println("To start a simple text server, run 'netcat -l <port>' and " + "type the input text into the command line"); return; } // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream(hostname, port, "\n"); text.print("数据源"); // parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy(value -> "aaa") .process(new KeyedProcessFunction<String, WordWithCount, WordWithCount>() { private transient ValueState<Long> valueState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ValueStateDescriptor<Long> valueStateDescriptor = new ValueStateDescriptor("totalAmountState", TypeInformation.of(new TypeHint<Long>() { })); valueState = this.getRuntimeContext().getState(valueStateDescriptor); } @Override public void processElement(WordWithCount wordWithCount, Context context, Collector<WordWithCount> out) throws Exception { Long value = valueState.value(); Long counts = wordWithCount.count; if (value != null){ System.out.println("打印内存state = "+value); logger.error("打印内存state = "+value); value = value+counts; valueState.update(value); out.collect(wordWithCount); }else { value = counts; valueState.update(value); } } }); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } // ------------------------------------------------------------------------ /** * Data type for words with count. */ public static class WordWithCount { public String word; public long count; public WordWithCount() {} public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + " : " + count; } } }
2)运行任务脚本 :
export HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/jars/*;./bin/flink run -m yarn-cluster \
-yD yarn.containers.vcores=2 \
./examples/streaming/SocketWindowWordCount.jar --hostname 192.168.6.31 --port 12345
3)我们在linux环境 192.168.6.31节点执行 nc -l 12345
随意输入点数据
4)我们这个时候查看yarn任务打印输出:
假定state = 11
5)我们执行main方法,干掉任务,然后查看hdfs:
发现生成了文件..............
6)再次启动任务 ,输入nc -l 12345 输入一条数据,发现state打印是从上次state=12开始的,验证成功,savapoint有效果