【问题标题】:Use Spring together with Spark将 Spring 与 Spark 一起使用
【发布时间】:2015-07-15 05:09:58
【问题描述】:

我正在开发一个 Spark 应用程序,并且我习惯于将 Spring 作为依赖注入框架。现在我遇到了一个问题,处理部分使用了 Spring 的 @Autowired 功能,但它是由 Spark 序列化和反序列化的。

所以下面的代码给我带来了麻烦:

Processor processor = ...; // This is a Spring constructed object
                           // and makes all the trouble
JavaRDD<Txn> rdd = ...; // some data for Spark
rdd.foreachPartition(processor);

处理器看起来像这样:

public class Processor implements VoidFunction<Iterator<Txn>>, Serializeable {
    private static final long serialVersionUID = 1L;

    @Autowired // This will not work if the object is deserialized
    private transient DatabaseConnection db;

    @Override
    public void call(Iterator<Txn> txns) {
        ... // do some fance stuff
        db.store(txns);
    }
}

所以我的问题是:是否可以将 Spring 与 Spark 结合使用?如果不是,那么做这样的事情最优雅的方式是什么?任何帮助表示赞赏!

【问题讨论】:

  • 如果问题是您反序列化对象并且@Autowired 仅在第一次初始化时运行,那么您可以从技术上获取 ApplicationContext 并强制它注入您的瞬态对象手动。

标签: java spring apache-spark spark-streaming


【解决方案1】:

来自提问者:添加:要直接干扰反序列化部分而不修改您自己的类,请使用以下spring-spark project by parapluplu。这个项目会在你的 bean 被 spring 反序列化时自动装配它。


编辑:

要使用 Spark,您需要进行以下设置(也可以在 this repository 中看到):

  • Spring Boot + Spark:

.

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.2.RELEASE</version>
    <relativePath/>
    <!-- lookup parent from repository -->
</parent>

...

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <exclusions>
            <exclusion>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>

    <!-- fix java.lang.ClassNotFoundException: org.codehaus.commons.compiler.UncheckedCompileException -->
    <dependency>
        <groupId>org.codehaus.janino</groupId>
        <artifactId>commons-compiler</artifactId>
        <version>2.7.8</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.slf4j/log4j-over-slf4j -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>log4j-over-slf4j</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.6.4</version>
    </dependency>

</dependencies>

然后你需要应用程序类,就像 Spring Boot 一样:

@SpringBootApplication
public class SparkExperimentApplication {

    public static void main(String[] args) {
        SpringApplication.run(SparkExperimentApplication.class, args);
    }
}

然后是一个将它们绑定在一起的配置

@Configuration
@PropertySource("classpath:application.properties")
public class ApplicationConfig {

    @Autowired
    private Environment env;

    @Value("${app.name:jigsaw}")
    private String appName;

    @Value("${spark.home}")
    private String sparkHome;

    @Value("${master.uri:local}")
    private String masterUri;

    @Bean
    public SparkConf sparkConf() {
        SparkConf sparkConf = new SparkConf()
                .setAppName(appName)
                .setSparkHome(sparkHome)
                .setMaster(masterUri);

        return sparkConf;
    }

    @Bean
    public JavaSparkContext javaSparkContext() {
        return new JavaSparkContext(sparkConf());
    }

    @Bean
    public SparkSession sparkSession() {
        return SparkSession
                .builder()
                .sparkContext(javaSparkContext().sc())
                .appName("Java Spark SQL basic example")
                .getOrCreate();
    }

    @Bean
    public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }
}

那么就可以使用SparkSession类与Spark SQL进行通信了:

/**
 * Created by achat1 on 9/23/15.
 * Just an example to see if it works.
 */
@Component
public class WordCount {
    @Autowired
    private SparkSession sparkSession;

    public List<Count> count() {
        String input = "hello world hello hello hello";
        String[] _words = input.split(" ");
        List<Word> words = Arrays.stream(_words).map(Word::new).collect(Collectors.toList());
        Dataset<Row> dataFrame = sparkSession.createDataFrame(words, Word.class);
        dataFrame.show();
        //StructType structType = dataFrame.schema();

        RelationalGroupedDataset groupedDataset = dataFrame.groupBy(col("word"));
        groupedDataset.count().show();
        List<Row> rows = groupedDataset.count().collectAsList();//JavaConversions.asScalaBuffer(words)).count();
        return rows.stream().map(new Function<Row, Count>() {
            @Override
            public Count apply(Row row) {
                return new Count(row.getString(0), row.getLong(1));
            }
        }).collect(Collectors.toList());
    }
}

参考这两个类:

public class Word {
    private String word;

    public Word() {
    }

    public Word(String word) {
        this.word = word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public String getWord() {
        return word;
    }
}

public class Count {
    private String word;
    private long count;

    public Count() {
    }

    public Count(String word, long count) {
        this.word = word;
        this.count = count;
    }

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }
}

然后你可以运行看看它返回了正确的数据:

@RequestMapping("api")
@Controller
public class ApiController {
    @Autowired
    WordCount wordCount;

    @RequestMapping("wordcount")
    public ResponseEntity<List<Count>> words() {
        return new ResponseEntity<>(wordCount.count(), HttpStatus.OK);
    }
}

[{"word":"hello","count":4},{"word":"world","count":1}]

【讨论】:

  • 我真的很想保持 Processing 类不变。难道没有什么办法可以干扰反序列化器在那里注入依赖吗?
  • 我不知道,根据我的了解,这是我能想到的唯一解决方案——它可能不是最好的 :)
  • 好的,但这对我有帮助。我会以稍微不同的方式使用它。我将在自动反序列化后注入依赖项。因此,我将扩展 kryo 序列化类。完成后我会编辑您的答案,因为此解决方案将基于您的答案。
  • 好的,我在 github 上为此创建了一个项目,并在您的帖子中链接了它。我希望这对你来说没问题。非常感谢您的提示!
  • @EpicPandaForce 我正在做一些测试,似乎你可以通过设置 .config("spark.jars", "target/simple-project-1.0.jar") 来做到这一点除了主你建议的网址。无论如何,谢谢。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-12-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多