Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
4.1k views
in Technique[技术] by (71.8m points)

python - pyflink(flink) 1.12.0 bug when table cast to datastream via to_append_stream(java api is: toAppendStream)

Thanks a lot for any help!!!

code:

from pyflink.common.typeinfo import RowTypeInfo, Types, BasicTypeInfo, TupleTypeInfo
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

# stream 模式的env创建
env_settings_stream = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
env_stream = StreamTableEnvironment.create(environment_settings=env_settings_stream)

table1 = env_stream.from_elements([(1, 23.4, 'lili'), (2, 33.4, 'er'), (3, 45.6, 'yu')], ['id', 'order_amt', 'name'])
table2 = env_stream.from_elements([(1, 43.4, 'xixi'), (2, 53.4, 'rr'), (3, 65.6, 'ww')], ['id2', 'order_amt2', 'name'])

# types: List[TypeInformation], field_names: List[str]
# row_type_info = RowTypeInfo([BasicTypeInfo.STRING_TYPE_INFO(), BasicTypeInfo.FLOAT_TYPE_INFO(), BasicTypeInfo.STRING_TYPE_INFO()], ['id', 'order_amt', 'name'])
row_type_info = TupleTypeInfo([BasicTypeInfo.STRING_TYPE_INFO(), BasicTypeInfo.FLOAT_TYPE_INFO(), BasicTypeInfo.STRING_TYPE_INFO()])


stream = env_stream.to_append_stream(table1, row_type_info)

error info:

Traceback (most recent call last):
  File "/Users/hulc/anaconda3/envs/myenv_3_6/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/Users/hulc/anaconda3/envs/myenv_3_6/lib/python3.6/site-packages/py4j/protocol.py", line 332, in get_return_value
    format(target_id, ".", name, value))
py4j.protocol.Py4JError: An error occurred while calling o4.toAppendStream. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method toAppendStream([class org.apache.flink.table.api.internal.TableImpl, class org.apache.flink.api.java.typeutils.TupleTypeInfo]) does not exist
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

env:

  1. apache-flink 1.12.0(python flink)
  2. py4j 0.10.8.1 (when pip3 install the apache-flink, the py4j will be install automatically for dependency)
  3. python 3.7(anaconda)
  4. pycharm 2020.1.1 version
  5. mac os 11.1

debug info image:

debug info image 1

debug info image 2

reproduction step:

  1. the same environment, run the code locally(local mode)
  2. breakpoint at line of code: "stream = env_stream.to_append_stream(table1, row_type_info)"
  3. debug run, and the breakpoint will be triggered twice, the first time toAppendStream method not found, the second time toAppendStream method found. But exception raised for the first time.

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
等待大神解答

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...