Flink Table API
''' Description: henggao_note version: v1.0.0 Date: 2022-03-13 22:11:10 LastEditors: henggao LastEditTime: 2022-03-13 22:27:15 ''' from pyflink.table.expressions import lit from pyflink.table import EnvironmentSettings, TableEnvironment environment_settings = EnvironmentSettings.new_instance( ).use_blink_planner().in_batch_mode().build() t_env = TableEnvironment.create(environment_settings=environment_settings) t_env.get_config().get_configuration().set_string('parallelism.default', '1') t_env.execute_sql(""" CREATE TABLE mySource ( word STRING ) WITH ( 'connector' = 'filesystem', 'format' = 'csv', 'path' ='D:\Cumtb_Code\Flink\input.csv' ) """) t_env.execute_sql(""" CREATE TABLE mySink ( word STRING, `count` BIGINT ) WITH ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = 'D:\Cumtb_Code\Flink\word_count_output' ) """) tab = t_env.from_path('mySource') tab.group_by(tab.word) \ .select(tab.word, lit(1).count) \ .execute_insert('mySink').wait()
创建
input.csv
文件,写入内容flink pyflink flink kafka python
运行生成文件在
word_count_output
目录下,词频统计结果:flink,2 pyflink,1 kafka,1 python,1