【发布时间】:2021-07-27 03:06:33
【问题描述】:
我正在编写一个基于我用于音频文件的工作生产者的 Kafka 源连接器。连接器启动但没有任何反应,没有错误,没有数据,我不确定这是编码问题还是配置问题。
连接器应该读取整个目录,并将文件作为字节数组读取。
配置类:
package hothman.example;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Importance;
import java.util.Map;
public class AudioSourceConnectorConfig extends AbstractConfig {
public static final String FILENAME_CONFIG="fileName";
private static final String FILENAME_DOC ="Enter the path of the audio files";
public static final String TOPIC_CONFIG = "topic";
private static final String TOPIC_DOC = "Enter the topic to write to..";
public AudioSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
}
public AudioSourceConnectorConfig(Map<String, String> parsedConfig) {
this(conf(), parsedConfig);
}
public static ConfigDef conf() {
return new ConfigDef()
.define(FILENAME_CONFIG, Type.STRING, Importance.HIGH, FILENAME_DOC)
.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, TOPIC_DOC);
}
public String getFilenameConfig(){
return this.getString("fileName");
}
public String getTopicConfig(){
return this.getString("topic");
}
}
SourceConnectorClass
package hothman.example;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AudioSourceConnector extends SourceConnector {
/*
Your connector should never use System.out for logging. All of your classes should use slf4j
for logging
*/
private static Logger log = LoggerFactory.getLogger(AudioSourceConnector.class);
private AudioSourceConnectorConfig config;
private String filename;
private String topic;
@Override
public String version() {
return VersionUtil.getVersion();
}
@Override
public void start(Map<String, String> props) {
filename = config.getFilenameConfig();
topic = config.getTopicConfig();
if (topic == null || topic.isEmpty())
throw new ConnectException("AudiSourceConnector configuration must include 'topic' setting");
if (topic.contains(","))
throw new ConnectException("AudioSourceConnector should only have a single topic when used as a source.");
}
@Override
public Class<? extends Task> taskClass() {
//TODO: Return your task implementation.
return AudioSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configsList = new ArrayList<>();
// Only one input stream makes sense.
Map<String, String> configs = new HashMap<>();
if (filename != null)
configs.put(config.getFilenameConfig(), filename);
configs.put(config.getTopicConfig(), topic);
configsList.add(configs);
return configsList;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return AudioSourceConnectorConfig.conf();
}
}
SourceTask 类
package hothman.example;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.util.*;
import static com.sun.nio.file.ExtendedWatchEventModifier.FILE_TREE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
public class AudioSourceTask extends SourceTask {
/*
Your connector should never use System.out for logging. All of your classes should use slf4j
for logging
*/
static final Logger log = LoggerFactory.getLogger(AudioSourceTask.class);
private AudioSourceConnectorConfig config;
public static final String POSITION_FIELD = "position";
private static final Schema VALUE_SCHEMA = Schema.BYTES_SCHEMA;
private String filename;
private String topic = null;
private int offset = 0;
private FileSystem fs = FileSystems.getDefault();
private WatchService ws = fs.newWatchService();
private Path dir;
private File directoryPath;
private ArrayList<File> listOfFiles;
private byte[] temp = null;
public AudioSourceTask() throws IOException {
}
@Override
public String version() {
return VersionUtil.getVersion();
}
@Override
public void start(Map<String, String> props) {
filename = config.getFilenameConfig();
topic = config.getTopicConfig();
if (topic == null)
throw new ConnectException("AudioSourceTask config missing topic setting");
dir = Paths.get(filename);
try {
dir.register(ws, new WatchEvent.Kind[]{ENTRY_CREATE, ENTRY_DELETE}, FILE_TREE);
} catch (IOException e) {
e.printStackTrace();
}
directoryPath = new File(String.valueOf(dir));
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
//TODO: Create SourceRecord objects that will be sent the kafka cluster.
listOfFiles = new ArrayList<File>(Arrays.asList(directoryPath.listFiles()));
Map<String, Object> offset = context.offsetStorageReader().
offset(Collections.singletonMap(config.getFilenameConfig(), filename));
ArrayList<SourceRecord> records = new ArrayList<>(1);
try {
for (File file : listOfFiles) {
// send existing files first
temp = Files.readAllBytes(Paths.get(file.toString()));
records.add(new SourceRecord(null,
null, topic, Schema.BYTES_SCHEMA, temp));
}
return records;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
@Override
public void stop() {
//TODO: Do whatever is required to stop your task.
}
}
版本类
package hothman.example;
/**
* Created by jeremy on 5/3/16.
*/
class VersionUtil {
public static String getVersion() {
try {
return VersionUtil.class.getPackage().getImplementationVersion();
} catch(Exception ex){
return "0.0.0.0";
}
}
}
Connector.properties
name=AudioSourceConnector
tasks.max=1
connector.class=hothman.example.AudioSourceConnector
fileName = G:\\Files
topic= my-topic
Connect-standalone.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=G:/Kafka/kafka_2.12-2.8.0/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=G:/Kafka/kafka_2.12-2.8.0/plugins
错误:
[2021-05-05 01:24:27,926] INFO WorkerSourceTask{id=AudioSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-05-05 01:24:27,928] ERROR WorkerSourceTask{id=AudioSourceConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)
java.lang.OutOfMemoryError: Java heap space
at java.nio.file.Files.read(Files.java:3099)
at java.nio.file.Files.readAllBytes(Files.java:3158)
at hothman.example.AudioSourceTask.poll(AudioSourceTask.java:93)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:273)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2021-05-05 01:24:27,929] INFO [Producer clientId=connector-producer-AudioSourceConnector-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1204)
[2021-05-05 01:24:27,933] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:659)
[2021-05-05 01:24:27,934] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:663)
[2021-05-05 01:24:27,934] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:669)
[2021-05-05 01:24:27,935] INFO App info kafka.producer for connector-producer-AudioSourceConnector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2021-05-05 01:24:36,479] INFO WorkerSourceTask{id=AudioSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
【问题讨论】:
-
您是否尝试过使用您创建的记录器来实际查看执行结束的位置?
-
对于初学者,在 poll 方法中,你可以写
log.info("Reading file {}", file),然后如果你运行连接器时它从不打印任何东西,你就知道它从来没有到达那一行,所以在开头添加日志taskConfigs 方法打印其他有用信息,或者只是log.info("got here")。如果您有单独的代码可以按预期工作,最好将其打包为您通过接口导入和公开的类,而不是直接移植到 Connect 工作方法中 -
发现问题了!
config.getFilenameConfig();没有返回任何内容,所以我现在手动对路径进行编码,现在任务正在运行,但我遇到了我在上面的问题中发布的错误 -
我解决了上述错误,连接器现在可以工作了。这一切都归功于你告诉我使用记录器。我还必须编辑 connect-standalone.properties 文件并更改 producer.max.request.size 和 producer.buffer.memory 的大小.现在唯一不能正常工作的是监听新添加的文件,每当我将文件添加到连接器提供的目录时 进程无法访问该文件,因为它正在被另一个进程使用我猜我必须给它一个等待时间。并找出为什么 config.getFilenameConfig 不起作用
-
如果已解决,请随时在下方发布您的解决方案。是的,您根本没有使用 WatchService,但一个想法是创建一个 ArrayDeque 对象,该对象在文件创建事件中填充,然后您的 poll 方法将从队列中抓取文件,而不是列出完整的目录内容。我也不确定WatchService最初是否收集现有文件...无论如何,如果您想将内存加倍,可以通过在开始连接之前导出
KAFKA_HEAP_OPTS="-Xmx4g"来修复OOM错误
标签: java apache-kafka apache-kafka-connect