Spring Boot 提供与各种系统的集成,包括 Spark、Hadoop、YARN、Kafka、JDBC 数据库。
例如,我有这个application.properties
spring.main.web-environment=false
appName=spring-spark
sparkHome=/Users/username/Applications/spark-2.2.1-bin-hadoop2.7
masterUri=local
这是一个应用程序类
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.core.env.Environment;
@Configuration
@PropertySource("classpath:application.properties")
public class ApplicationConfig {
@Autowired
private Environment env;
@Value("${appName:Spark Example}")
private String appName;
@Value("${sparkHome}")
private String sparkHome;
@Value("${masterUri:local}")
private String masterUri;
@Bean
public SparkConf sparkConf() {
return new SparkConf()
.setAppName(appName)
.setSparkHome(sparkHome)
.setMaster(masterUri);
}
@Bean
public JavaSparkContext javaSparkContext() {
return new JavaSparkContext(sparkConf());
}
@Bean
public SparkSession sparkSession() {
SparkSession.Builder sparkBuilder = SparkSession.builder()
.appName(appName)
.master(masterUri)
.sparkContext(javaSparkContext().sc());
return sparkBuilder.getOrCreate();
}
@Bean
public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
return new PropertySourcesPlaceholderConfigurer();
}
}
taskContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--List out all tasks here-->
<bean id="exampleSparkTask" class="com.example.spark.task.SampleSparkTask">
<constructor-arg ref="sparkSession" />
</bean>
</beans>
应用程序
@SpringBootApplication
@ImportResource("classpath:taskContext.xml")
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
实际上在这里为 Spark 运行 Scala 代码
@Order(1)
class SampleSparkTask(sparkSession: SparkSession) extends ApplicationRunner with Serializable {
// for spark streaming
@transient val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(3))
import sparkSession.implicits._
@throws[Exception]
override def run(args: ApplicationArguments): Unit = {
// spark code here
}
}
从那里,您可以定义一些@AutoWired 的东西。