【问题标题】:Local testing Apache Beam with SqlTransform in Python SDK. I receive an error: "'Execution of [%s] not implemented in runner %s.'在 Python SDK 中使用 SqlTransform 本地测试 Apache Beam。我收到一条错误消息:“'[%s] 的执行未在运行器 %s 中实现。'
【发布时间】:2026-01-21 08:45:01
【问题描述】:

在我的本地 Apache Beam 实例上使用 SqlTransform 时出现错误。这是一个简单的测试:

with beam.Pipeline() as p:
    pc = (p | beam.Create([
        FruitRecipe("pie", "strawberry", 3, 1.5),
        FruitRecipe("muffin", "blueberry", 2, 2.),
        ])
        | beam.Map(lambda x: beam.Row(recipe = x[0],  # str
                                 fruit = x[1],    # str
                                 quantity = x[2], # int
                                 unit_cost = x[3], # float
                                 is_berry = x[1].endswith('berry')))) # bool
    pc | SqlTransform(" SELECT * FROM PCOLLECTION WHERE quantity > 1")

错误:

RuntimeError Traceback(最近一次调用最后一次) 在 11 is_berry = x[1].endswith('berry')))) # bool 12 ---> 13 个 | SqlTransform(" SELECT * FROM PCOLLECTION WHERE 数量 > 1") #| beam.Map(打印)

~/PROJECTS/Apache_Beam/env/lib/python3.8/site-packages/apache_beam/pvalue.py in __or__(self, ptransform)
    139 
    140   def __or__(self, ptransform):
--> 141     return self.pipeline.apply(ptransform, self)
    142 
    143 

~/PROJECTS/Apache_Beam/env/lib/python3.8/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
    689         transform.type_check_inputs(pvalueish)
    690 
--> 691       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
    692 
    693       if type_options is not None and type_options.pipeline_type_check:

~/PROJECTS/Apache_Beam/env/lib/python3.8/site-packages/apache_beam/runners/runner.py in apply(self, transform, input, options)
    196       m = getattr(self, 'apply_%s' % cls.__name__, None)
    197       if m:
--> 198         return m(transform, input, options)
    199     raise NotImplementedError(
    200         'Execution of [%s] not implemented in runner %s.' % (transform, self))

~PROJECTS/Apache_Beam/env/lib/python3.8/site-packages/apache_beam/runners/runner.py in apply_PTransform(self, transform, input, options)
    226   def apply_PTransform(self, transform, input, options):
    227     # The base case of apply is to call the transform's expand.
--> 228     return transform.expand(input)
    229 
    230   def run_transform(self,

~/PROJECTS/Apache_Beam/env/lib/python3.8/site-packages/apache_beam/transforms/external.py in expand(self, pvalueish)
    316       response = service.Expand(request)
    317       if response.error:
--> 318         raise RuntimeError(response.error)
    319       self._expanded_components = response.components
    320       if any(env.dependencies

RuntimeError: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Encountered unsupported logical type URN: beam:logical:pythonsdk_any:v1
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
    at org.apache.beam.runners.core.construction.RehydratedComponents.getPCollection(RehydratedComponents.java:139)
    at org.apache.beam.sdk.expansion.service.ExpansionService.lambda$expand$0(ExpansionService.java:422)
    at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
    at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
    at java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Collections.java:1577)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Collections.java:1602)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
    at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:417)
    at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:491)
    at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Encountered unsupported logical type URN: beam:logical:pythonsdk_any:v1
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
    at org.apache.beam.runners.core.construction.RehydratedComponents.getCoder(RehydratedComponents.java:168)
    at org.apache.beam.runners.core.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:51)
    at org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:108)
    at org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:98)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
    ... 27 more
Caused by: java.lang.IllegalArgumentException: Encountered unsupported logical type URN: beam:logical:pythonsdk_any:v1
    at org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProtoWithoutNullable(SchemaTranslation.java:316)
    at org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProto(SchemaTranslation.java:232)
    at org.apache.beam.sdk.schemas.SchemaTranslation.fieldFromProto(SchemaTranslation.java:226)
    at org.apache.beam.sdk.schemas.SchemaTranslation.schemaFromProto(SchemaTranslation.java:212)
    at org.apache.beam.runners.core.construction.CoderTranslators$8.fromComponents(CoderTranslators.java:169)
    at org.apache.beam.runners.core.construction.CoderTranslators$8.fromComponents(CoderTranslators.java:151)
    at org.apache.beam.runners.core.construction.CoderTranslation.fromKnownCoder(CoderTranslation.java:170)
    at org.apache.beam.runners.core.construction.CoderTranslation.fromProto(CoderTranslation.java:145)
    at org.apache.beam.runners.core.construction.RehydratedComponents$2.load(RehydratedComponents.java:87)
    at org.apache.beam.runners.core.construction.RehydratedComponents$2.load(RehydratedComponents.java:82)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
    ... 38 more

 <

【问题讨论】:

    标签: python sql sdk apache-beam transformation


    【解决方案1】:

    tl;dr如果您根据您期望的类型在 beam.Row 调用中的每个字段周围添加强制转换,您应该能够解决此问题,例如:

            | beam.Map(lambda x: beam.Row(recipe = str(x[0]),
                                     fruit = str(x[1]),
                                     quantity = int(x[2]),
                                     unit_cost = float(x[3]),
                                     is_berry = bool(x[1].endswith('berry')))))
    

    详细说明

    错误java.lang.IllegalArgumentException: Encountered unsupported logical type URN: beam:logical:pythonsdk_any:v1 表示 Beam python 无法确定您传递给 SqlTransform 的 PCollection 中某个字段的类型。通常这没什么大不了的,Beam Python 只是使用后备(称为beam:logical:pythonsdk_any:v1),使用 Python 序列化(即 pickle)对任何此类字段的值进行编码。这很好用,因为下游 Python 转换完全能够读取该 pickle 编码数据。可能会影响性能,但不会破坏您的管道。

    然而,在SqlTransform 的情况下,我们实际上使用了Java SDK 的实现,它不知道Python 序列化。所以遇到beam:logical:pythonsdk_any:v1就放弃了。

    我上面建议的解决方案,在每个值周围添加强制转换,确保 Beam python 将为我们可以以可移植方式编码的每个字段推断 特定 类型。

    我为这个问题提交了BEAM-11690。在这种情况下,我们应该提出更有用的错误消息。感谢您提出问题!

    【讨论】:

    • 感谢您的快速回答。我相应地进行了更改,现在错误更改为: FileNotFoundError: [Errno 2] No such file or directory: 'docker'
    • 嗯,你在使用 DirectRunner 吗?我认为您需要安装 docker(例如,对于 Ubuntu,docs.docker.com/engine/install/ubuntu)来运行使用 SqlTransform 的管道
    • 是的,这就是我所做的,现在正在工作。
    • 嗨@TheNeuralBit。我需要在 docker 上安装什么?我正在尝试使用直接运行器在 apache 梁中使用 sql 转换,但得到“ValueError:不支持的信号:2”。我看到你的 youtube 视频说通用本地跑步者 (ULR)。我该如何安装它?在 Direct Runner 中不可能吗?提前致谢
    最近更新 更多