I'm using pyflink and Flink 11.2 and I've defined my table like this:
def _create_sink_table(st_env):
# Create SINK table.
st_env.execute_sql(f"""
CREATE TABLE {"in"} (
`a` STRING,
`b` STRING,
`c` STRING,
`d` ROW(
`e` STRING,
`f` STRING
)
) WITH (
'connector' = 'kafka',
'topic' = '{SINK_TOPIC_NAME}',
'properties.bootstrap.servers' = '{BOOTSTRAP_SERVERS}',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
""")
I want to use table.where to filter for subfield:
def execute(st_env: StreamTableEnvironment):
table = st_env.from_path("in")
table = table.select("a, b, c, d")
table = table.where("d.e = 'fail'")
Everything I've tried so far resulted in an error, this is one of them:
➜ ./run.sh
Traceback (most recent call last):
File "/tmp/test.py", line 143, in <module>
main()
File "/tmp/test.py", line 139, in main
execute(st_env)
File "/tmp/test.py", line 109, in execute
table = table.where("d.e = 'fail'")
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table.py", line 140, in where
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o39.where.
: org.apache.flink.table.api.ValidationException: Undefined function: e
at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$visit$0(LookupCallResolver.java:49)
at java.util.Optional.orElseThrow(Optional.java:290)
at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:49)
at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:38)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:65)
at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$resolveChildren$1(LookupCallResolver.java:64)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
It tries to execute the function e
on d, instead of retrieving the subfield.
I think
table.where("d.get(0) = 'fail'")
maybe work