【问题标题】:DynamoDB to Elasticsearch with AWS SDK v2?DynamoDB 到 Elasticsearch 与 AWS SDK v2?
【发布时间】:2020-10-31 06:07:54
【问题描述】:

我遇到了this question and answer,展示了如何将数据从 DynamoDB 推送到 Elasticsearch 以进行全文搜索索引。然而,我们的应用程序没有使用 Lambda。相反,我们使用 Apache Camel 来捕获 DynamoDB Streams 事件,并希望从那里将记录推送到 Elasticsearch。

由于我们使用的是 AWS SDK v2,因此我们不会捕获包含 DynamoDB 记录的 DynamodbEvent 类或对应的 DynamodbStreamRecord 记录类。相反,我们收到了一个software.amazon.awssdk.services.dynamodb.model.Record 对象。鉴于此,我们如何在 Elasticsearch 中序列化并随后索引这些数据?在引用的另一个问题中,记录被转换为 JSON 字符串,然后发送到 Elasticsearch。有没有办法用 v2 Record 类做到这一点?答案中提到的ItemUtils 类不再存在,所以我不知道另一种序列化它的方法。

非常感谢您提供的任何帮助!

【问题讨论】:

    标签: java amazon-web-services elasticsearch amazon-dynamodb amazon-dynamodb-streams


    【解决方案1】:

    与您提供的示例类似,您可以尝试以下操作:

    public void processRecord(Record record, String index, String type, RestHighLevelClient esClient) throws Exception {
      // Get operation
      final OperationType operationType = record.eventName();
      // Obtain a reference to actual DynamoDB stream record
      final StreamRecord streamRecord = record.dynamodb();
      // Get ID. Assume single numeric attribute as partition key
      final Map<String,AttributeValue> keys = streamRecord.keys();
      final String recordId = keys.get("ID").n();
    
      switch (operationType) {
        case INSERT:
          if (!streamRecord.hasNewImage()) {
            throw new IllegalArgumentException("No new image when inserting");
          }
          Map<String,AttributeValue> newImage = streamRecord.newImage();
          // Where toJson is defined here https://github.com/aaronanderson/aws-java-sdk-v2-utils/blob/master/src/main/java/DynamoDBUtil.java
          // and included below
          JsonObject jsonObject = toJson(newImage);
          IndexRequest indexRequest = new IndexRequest(index, type, recordId);
          indexRequest.source(jsonObject.toString(), XContentType.JSON);
          IndexResponse indexResponse = esClient.index(indexRequest, RequestOptions.DEFAULT);
          System.out.println("New content successfully indexed: " + indexResponse);
          break;
        case MODIFY:
          if (!streamRecord.hasNewImage()) {
            throw new IllegalArgumentException("No new image when updating");
          }
          Map<String,AttributeValue> newImage = streamRecord.newImage();
          JsonObject jsonObject = toJson(newImage);
          UpdateRequest updateRequest = new UpdateRequest(index, type, recordId);
          request.doc(jsonObject.toString(), XContentType.JSON);
          UpdateResponse updateResponse = esClient.update(updateRequest, RequestOptions.DEFAULT);
          System.out.println("Content successfully updated: " + updateResponse);
          break;
        case REMOVE:
          DeleteRequest deleteRequest = new DeleteRequest(index, type, recordId);
          DeleteResponse deleteResponse = esClient.delete(deleteRequest, RequestOptions.DEFAULT);
          System.out.println("Successfully removed: " + deleteResponse);
          break;
        default:
          throw new UnsupportedOperationException("Operation type " + opetationType + " not supportd");  
      }
    }
    

    toJson方法被定义为这个类:https://github.com/aaronanderson/aws-java-sdk-v2-utils/blob/master/src/main/java/DynamoDBUtil.java

    这里转载源代码:

    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.math.BigDecimal;
    import java.util.HashMap;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Map;
    import java.util.zip.DataFormatException;
    import java.util.zip.Deflater;
    import java.util.zip.Inflater;
    
    import javax.json.Json;
    import javax.json.JsonArray;
    import javax.json.JsonArrayBuilder;
    import javax.json.JsonNumber;
    import javax.json.JsonObject;
    import javax.json.JsonObjectBuilder;
    import javax.json.JsonString;
    import javax.json.JsonStructure;
    import javax.json.JsonValue;
    
    import software.amazon.awssdk.core.SdkBytes;
    import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
    
    /** This is a utility for converting DynamoDB AttributeValues to and from Java JSON-P objects */
    public class DynamoDBUtil {
    
        public static void addList(String key, JsonObjectBuilder objectBuilder, List<JsonObject> items) {
            if (!items.isEmpty()) {
                JsonArrayBuilder builder = Json.createArrayBuilder();
                items.forEach(i -> builder.add(i));
                objectBuilder.add(key, builder.build());
            }
    
        }
    
        public static JsonArray toJson(List<AttributeValue> attributeValues) {
            if (attributeValues == null) {
                return null;
            }
            JsonArrayBuilder valueBuilder = Json.createArrayBuilder();
            for (AttributeValue a : attributeValues) {
                add(toJson(a), valueBuilder);
            }
            return valueBuilder.build();
        }
    
        public static JsonObject toJson(Map<String, AttributeValue> attributeValues) {
            if (attributeValues == null) {
                return null;
            }
            JsonObjectBuilder valueBuilder = Json.createObjectBuilder();
            for (Map.Entry<String, AttributeValue> a : attributeValues.entrySet()) {
                add(a.getKey(), toJson(a.getValue()), valueBuilder);
            }
            return valueBuilder.build();
        }
    
        public static void add(String key, Object value, JsonObjectBuilder object) {
            if (value instanceof JsonValue) {
                object.add(key, (JsonValue) value);
                // with json-p 1.0 can't create JsonString or JsonNumber so simply setting JsonValue not an option.
            } else if (value instanceof String) {
                object.add(key, (String) value);
            } else if (value instanceof BigDecimal) {
                object.add(key, (BigDecimal) value);
            } else if (value instanceof Boolean) {
                object.add(key, (Boolean) value);
            } else if (value == null || value.equals(JsonValue.NULL)) {
                object.addNull(key);
            }
    
        }
    
        public static void add(Object value, JsonArrayBuilder array) {
            if (value instanceof JsonValue) {
                array.add((JsonValue) value);
            } else if (value instanceof String) {
                array.add((String) value);
            } else if (value instanceof BigDecimal) {
                array.add((BigDecimal) value);
            } else if (value instanceof Boolean) {
                array.add((Boolean) value);
            } else if (value.equals(JsonValue.NULL)) {
                array.addNull();
            }
    
        }
    
        public static Object toJson(AttributeValue attributeValue) {
            // with json-p 1.1 Json.createValue() can be used.
    
            if (attributeValue == null) {
                return null;
            }
            if (attributeValue.s() != null) {
                return attributeValue.s();
            }
            if (attributeValue.n() != null) {
                return new BigDecimal(attributeValue.n());
            }
            if (attributeValue.bool() != null) {
                // return attributeValue.bool() ? JsonValue.TRUE : JsonValue.FALSE;
                return attributeValue.bool();
            }
    
            if (attributeValue.b() != null) {
                // return Base64.getEncoder().encodeToString(attributeValue.b().array());
                return null;
            }
    
            if (attributeValue.nul() != null && attributeValue.nul()) {
                return JsonValue.NULL;
            }
    
            if (!attributeValue.m().isEmpty()) {
                return toJson(attributeValue.m());
            }
            if (!attributeValue.l().isEmpty()) {
                return toJson(attributeValue.l());
            }
    
            if (!attributeValue.ss().isEmpty()) {
                return attributeValue.ss();
            }
    
            if (!attributeValue.ns().isEmpty()) {
                return attributeValue.ns();
            }
    
            if (!attributeValue.bs().isEmpty()) {
                //return attributeValue.bs();
                return null;
            }
            return null;
        }
    
        public static Map<String, AttributeValue> toAttribute(JsonObject jsonObject) {
            Map<String, AttributeValue> attribute = new HashMap<>();
            jsonObject.entrySet().forEach(e -> {
                attribute.put(e.getKey(), toAttribute(e.getValue()));
            });
            return attribute;
        }
    
        public static List<AttributeValue> toAttribute(JsonArray jsonArray) {
            List<AttributeValue> attributes = new LinkedList<>();
            jsonArray.forEach(e -> {
                attributes.add(toAttribute(e));
            });
            return attributes;
        }
    
        public static AttributeValue toAttribute(JsonValue jsonValue) {
            if (jsonValue == null) {
                return null;
            }
            switch (jsonValue.getValueType()) {
            case STRING:
                return AttributeValue.builder().s(((JsonString) jsonValue).getString()).build();
            case OBJECT:
                return AttributeValue.builder().m(toAttribute((JsonObject) jsonValue)).build();
            case ARRAY:
                return AttributeValue.builder().l(toAttribute((JsonArray) jsonValue)).build();
            case NUMBER:
                return AttributeValue.builder().n(((JsonNumber) jsonValue).toString()).build();
            case TRUE:
                return AttributeValue.builder().bool(true).build();
            case FALSE:
                return AttributeValue.builder().bool(false).build();
            case NULL:
                return AttributeValue.builder().nul(true).build();
            }
    
            return null;
        }
    
        public static AttributeValue compress(Map<String, AttributeValue> attributeValues) throws IOException {
            return compress(toJson(attributeValues));
        }
    
        public static AttributeValue compress(List<AttributeValue> attributeValues) throws IOException {
            return compress(toJson(attributeValues));
        }
    
        public static AttributeValue compress(JsonStructure jsonStructure) throws IOException {
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            Json.createWriter(outputStream).write(jsonStructure);
            outputStream.close();
            byte[] jsonBinary = outputStream.toByteArray();
    
            outputStream = new ByteArrayOutputStream();
            Deflater deflater = new Deflater();
            deflater.setInput(jsonBinary);
            deflater.finish();
            byte[] buffer = new byte[1024];
            while (!deflater.finished()) {
                int count = deflater.deflate(buffer); // returns the generated code... index
                outputStream.write(buffer, 0, count);
            }
            outputStream.close();
            jsonBinary = outputStream.toByteArray();
    
            return AttributeValue.builder().b(SdkBytes.fromByteArray(jsonBinary)).build();
        }
    
        public static JsonStructure decompress(AttributeValue attributeValue) throws IOException, DataFormatException {
            Inflater inflater = new Inflater();
            byte[] jsonBinary = attributeValue.b().asByteArray();
            inflater.setInput(jsonBinary);
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream(jsonBinary.length);
            byte[] buffer = new byte[1024];
            while (!inflater.finished()) {
                int count = inflater.inflate(buffer);
                outputStream.write(buffer, 0, count);
            }
            outputStream.close();
            byte[] output = outputStream.toByteArray();
            ByteArrayInputStream bis = new ByteArrayInputStream(output);
            return Json.createReader(bis).read();
        }
    
    }
    

    这个类是这个gist中最初引入的一个更新版本。

    如果您更喜欢使用该库进行 JSON 序列化,这篇文章还为 Jackson 的 AtributeValue 序列化程序提供了一个 link

    【讨论】:

    • 这就像一个魅力! DynamoDbUtil 确实是我所缺少的。不知道我怎么不知道!
    • 伟大的@Shadowman!我同意你的看法,所有功劳归于DynamoDbUtil 班级。我很高兴听到这个答案很有帮助!
    猜你喜欢
    • 2015-05-18
    • 1970-01-01
    • 2021-04-23
    • 2018-10-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多