【发布时间】:2026-01-21 08:45:01
【问题描述】:
在我的本地 Apache Beam 实例上使用 SqlTransform 时出现错误。这是一个简单的测试:
with beam.Pipeline() as p:
pc = (p | beam.Create([
FruitRecipe("pie", "strawberry", 3, 1.5),
FruitRecipe("muffin", "blueberry", 2, 2.),
])
| beam.Map(lambda x: beam.Row(recipe = x[0], # str
fruit = x[1], # str
quantity = x[2], # int
unit_cost = x[3], # float
is_berry = x[1].endswith('berry')))) # bool
pc | SqlTransform(" SELECT * FROM PCOLLECTION WHERE quantity > 1")
错误:
RuntimeError Traceback(最近一次调用最后一次) 在 11 is_berry = x[1].endswith('berry')))) # bool 12 ---> 13 个 | SqlTransform(" SELECT * FROM PCOLLECTION WHERE 数量 > 1") #| beam.Map(打印)
~/PROJECTS/Apache_Beam/env/lib/python3.8/site-packages/apache_beam/pvalue.py in __or__(self, ptransform)
139
140 def __or__(self, ptransform):
--> 141 return self.pipeline.apply(ptransform, self)
142
143
~/PROJECTS/Apache_Beam/env/lib/python3.8/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
689 transform.type_check_inputs(pvalueish)
690
--> 691 pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
692
693 if type_options is not None and type_options.pipeline_type_check:
~/PROJECTS/Apache_Beam/env/lib/python3.8/site-packages/apache_beam/runners/runner.py in apply(self, transform, input, options)
196 m = getattr(self, 'apply_%s' % cls.__name__, None)
197 if m:
--> 198 return m(transform, input, options)
199 raise NotImplementedError(
200 'Execution of [%s] not implemented in runner %s.' % (transform, self))
~PROJECTS/Apache_Beam/env/lib/python3.8/site-packages/apache_beam/runners/runner.py in apply_PTransform(self, transform, input, options)
226 def apply_PTransform(self, transform, input, options):
227 # The base case of apply is to call the transform's expand.
--> 228 return transform.expand(input)
229
230 def run_transform(self,
~/PROJECTS/Apache_Beam/env/lib/python3.8/site-packages/apache_beam/transforms/external.py in expand(self, pvalueish)
316 response = service.Expand(request)
317 if response.error:
--> 318 raise RuntimeError(response.error)
319 self._expanded_components = response.components
320 if any(env.dependencies
RuntimeError: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Encountered unsupported logical type URN: beam:logical:pythonsdk_any:v1
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at org.apache.beam.runners.core.construction.RehydratedComponents.getPCollection(RehydratedComponents.java:139)
at org.apache.beam.sdk.expansion.service.ExpansionService.lambda$expand$0(ExpansionService.java:422)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Collections.java:1577)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Collections.java:1602)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:417)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:491)
at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Encountered unsupported logical type URN: beam:logical:pythonsdk_any:v1
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at org.apache.beam.runners.core.construction.RehydratedComponents.getCoder(RehydratedComponents.java:168)
at org.apache.beam.runners.core.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:51)
at org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:108)
at org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:98)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
... 27 more
Caused by: java.lang.IllegalArgumentException: Encountered unsupported logical type URN: beam:logical:pythonsdk_any:v1
at org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProtoWithoutNullable(SchemaTranslation.java:316)
at org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProto(SchemaTranslation.java:232)
at org.apache.beam.sdk.schemas.SchemaTranslation.fieldFromProto(SchemaTranslation.java:226)
at org.apache.beam.sdk.schemas.SchemaTranslation.schemaFromProto(SchemaTranslation.java:212)
at org.apache.beam.runners.core.construction.CoderTranslators$8.fromComponents(CoderTranslators.java:169)
at org.apache.beam.runners.core.construction.CoderTranslators$8.fromComponents(CoderTranslators.java:151)
at org.apache.beam.runners.core.construction.CoderTranslation.fromKnownCoder(CoderTranslation.java:170)
at org.apache.beam.runners.core.construction.CoderTranslation.fromProto(CoderTranslation.java:145)
at org.apache.beam.runners.core.construction.RehydratedComponents$2.load(RehydratedComponents.java:87)
at org.apache.beam.runners.core.construction.RehydratedComponents$2.load(RehydratedComponents.java:82)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
... 38 more
<
【问题讨论】:
标签: python sql sdk apache-beam transformation