博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink1.7.2 sql 批处理示例
阅读量:2080 次
发布时间:2019-04-29

本文共 24150 字,大约阅读时间需要 80 分钟。

Flink1.7.2 sql 批处理示例

源码

概述

  • 本文为Flink sql Dataset 示例
  • 主要操作包括:Scan / Select,as (table),as (column),limit,Where / Filter,between and (where),Sum,min,max,avg,
  1. (group by ),group by having,distinct,INNER JOIN,left join,right join,full outer join,union,unionAll,INTERSECT

in,EXCEPT,insert into

SELECT

Scan / Select

  • 功能描述: 查询一个表中的所有数据
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scanimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)    tableEnv.sqlQuery(s"select name,age FROM user1")      .first(100).print()    /**      * 输出结果      *      * 小明,15      * 小王,45      * 小李,25      * 小慧,35      */  }}
  • 输出结果
小明,15小王,45小李,25小慧,35

as (table)

  • 功能描述: 给表名取别称
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scanimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)    tableEnv.sqlQuery(s"select t1.name,t1.age FROM user1 as t1")      .first(100).print()    /**      * 输出结果      *      * 小明,15      * 小王,45      * 小李,25      * 小慧,35      */  }}
  • 输出结果
小明,15小王,45小李,25小慧,35

as (column)

  • 功能描述: 给表名取别称
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scanimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)    tableEnv.sqlQuery(s"select name a,age as b FROM user1 ")      .first(100).print()    /**      * 输出结果      *      * 小明,15      * 小王,45      * 小李,25      * 小慧,35      */  }}
  • 输出结果
小明,15小王,45小李,25小慧,35

limit

  • 功能描述:查询一个表的数据,只返回指定的前几行(争对并行度而言,所以并行度不一样,结果不一样)
  • scala 程序
package com.opensourceteams.mo`dule.bigdata.flink.example.sql.dataset.operations.limitimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    env.setParallelism(2)    val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)    /**      * 先排序,按age的降序排序,输出前100位结果,注意是按同一个并行度中的数据进行排序,也就是同一个分区      */    tableEnv.sqlQuery(s"select name,age FROM user1  ORDER BY age desc LIMIT 100  ")      .first(100).print()    /**      * 输出结果 并行度设置为2      *      * 小明,15      * 小王,45      * 小慧,35      * 小李,25      */    /**      * 输出结果 并行度设置为1      *      * 小王,45      * 小慧,35      * 小李,25      * 小明,15      */  }}
  • 输出结果
小明,15小王,45小慧,35小李,25

Where / Filter

  • 功能描述:列加条件过滤表中的数据
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.whereimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)    tableEnv.sqlQuery(s"select name,age,sex FROM user1 where sex = '女'")      .first(100).print()    /**      * 输出结果      *       * 小李,25,女      * 小慧,35,女      */      }}
  • 输出结果
小李,25,女小慧,35,女

between and (where)

  • 功能描述: 过滤列中的数据, 开始数据 <= data <= 结束数据
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.whereBetweenAndimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)    tableEnv.sqlQuery(s"select name,age,sex FROM user1 where age between 20 and  35")      .first(100).print()    /**      * 结果      *      * 小李,25,女      * 小慧,35,女      */  }}
  • 输出结果
小李,25,女小慧,35,女

Sum

  • 功能描述: 求和所有数据
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.sumimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)    //汇总所有数据    tableEnv.sqlQuery(s"select sum(salary) FROM user1")      .first(100).print()    /**      * 输出结果      *      * 6800      */  }}
  • 输出结果
6800

max

  • 功能描述: 求最大值
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)    //汇总所有数据    tableEnv.sqlQuery(s"select max(salary) FROM user1 ")      .first(100).print()    /**      * 输出结果      *      * 4000      */  }}
  • 输出结果
4000

min

  • 功能描述: 求最小值
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.minimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)    tableEnv.sqlQuery(s"select min(salary) FROM user1 ")      .first(100).print()    /**      * 输出结果      *      * 500      */  }}
  • 输出结果
500

sum (group by )

  • 功能描述: 按性别分组求和
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.groupimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)    //汇总所有数据    tableEnv.sqlQuery(s"select sex,sum(salary) FROM user1 group by sex")      .first(100).print()    /**      * 输出结果      *       * 女,1300      * 男,5500      */  }}
  • 输出结果
女,1300男,5500

group by having

  • 功能描述:
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.group_havingimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)    //分组统计,having是分组条件查询    tableEnv.sqlQuery(s"select sex,sum(salary) FROM user1 group by sex having sum(salary) >1500")      .first(100).print()    /**      * 输出结果      *       *       */  }}
  • 输出结果
男,5500

distinct

  • 功能描述: 去重一列或多列
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.distinctimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements(("a",15,"male"),("a",45,"female"),("d",25,"male"),("c",35,"female"))    val tableEnv = TableEnvironment.getTableEnvironment(env)    tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)    /**      * 对数据去重      */    tableEnv.sqlQuery("select distinct name  FROM user1   ")      .first(100).print()    /**      * 输出结果      *      * a      * c      * d      */  }}
  • 输出结果
acd

join

INNER JOIN

  • 功能描述: 连接两个表,按指定的列,两列都存在值才输出
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.innerJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))    val dataSetGrade = env.fromElements((1,"语文",100),(2,"数学",80),(1,"外语",50) )    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)    tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction)    //内连接,两个表   // tableEnv.sqlQuery("select * FROM `user`  INNER JOIN  grade on  `user`.id = grade.userId ")    tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user`  INNER JOIN  grade on  `user`.id = grade.userId ")      .first(100).print()    /**      * 输出结果      * 2,小王,45,男,4000,数学,80      * 1,小明,15,男,1500,语文,100      * 1,小明,15,男,1500,外语,50      */  }}
  • 输出结果
2,小王,45,男,4000,数学,801,小明,15,男,1500,语文,1001,小明,15,男,1500,外语,50

left join

  • 功能描述:连接两个表,按指定的列,左表中存在值就一定输出,右表如果不存在,就显示为空
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.leftJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))    val dataSetGrade = env.fromElements((1,"语文",100),(2,"数学",80),(1,"外语",50) )    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)    tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction)  //左连接,拿左边的表中的每一行数据,去关联右边的数据,如果有相同的匹配数据,就都匹配出来,如果没有,就匹配一条,不过右边的数据为空    tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user`  LEFT JOIN  grade on  `user`.id = grade.userId ")      .first(100).print()    /**      * 输出结果      *      * 1,小明,15,男,1500,语文,100      * 1,小明,15,男,1500,外语,50      * 2,小王,45,男,4000,数学,80      * 4,小慧,35,女,500,null,null      * 3,小李,25,女,800,null,null      *      *      */  }}
  • 输出结果
1,小明,15,男,1500,语文,1001,小明,15,男,1500,外语,502,小王,45,男,4000,数学,804,小慧,35,女,500,null,null3,小李,25,女,800,null,null

right join

  • 功能描述:连接两个表,按指定的列,右表中存在值就一定输出,左表如果不存在,就显示为空
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.rightJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))    val dataSetGrade = env.fromElements((1,"语文",100),(2,"数学",80),(1,"外语",50),(10,"外语",90) )    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)    tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction)  //左连接,拿左边的表中的每一行数据,去关联右边的数据,如果有相同的匹配数据,就都匹配出来,如果没有,就匹配一条,不过右边的数据为空    tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user`  RIGHT JOIN  grade on  `user`.id = grade.userId ")      .first(100).print()    /**      * 输出结果      *      * 1,小明,15,男,1500,外语,50      * 1,小明,15,男,1500,语文,100      * 2,小王,45,男,4000,数学,80      * null,null,null,null,null,外语,90      *      *      */  }}
  • 输出结果
1,小明,15,男,1500,外语,501,小明,15,男,1500,语文,1002,小王,45,男,4000,数学,80null,null,null,null,null,外语,90

full outer join

  • 功能描述: 连接两个表,按指定的列,只要有一表中存在值就一定输出,另一表如果不存在就显示为空
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.fullOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))    val dataSetGrade = env.fromElements((1,"语文",100),(2,"数学",80),(1,"外语",50),(10,"外语",90) )    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)    tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction)  //左,右,全匹配所有数据    tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user` FULL OUTER JOIN  grade on  `user`.id = grade.userId ")      .first(100).print()    /**      * 输出结果      *      *      * 3,小李,25,女,800,null,null      * 1,小明,15,男,1500,外语,50      * 1,小明,15,男,1500,语文,100      * 2,小王,45,男,4000,数学,80      * 4,小慧,35,女,500,null,null      * null,null,null,null,null,外语,90      *      *      *      */  }}
  • 输出结果
3,小李,25,女,800,null,null1,小明,15,男,1500,外语,501,小明,15,男,1500,语文,1002,小王,45,男,4000,数学,804,小慧,35,女,500,null,nullnull,null,null,null,null,外语,90

Set Operations

union

  • 功能描述: 连接两个表中的数据,会去重
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.unionimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))    val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)    tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)    /**      *  union 连接两个表,会去重      */    tableEnv.sqlQuery(      "select * from ("                +"select t1.* FROM `user` as t1 ) " +                + " UNION "                + " ( select t2.* FROM t2 )"       )      .first(100).print()    /**      * 输出结果      *      * 30,小李,25,女,800      * 40,小慧,35,女,500      * 2,小王,45,男,4000      * 4,小慧,35,女,500      * 3,小李,25,女,800      * 1,小明,15,男,1500      *      */  }}
  • 输出结果
30,小李,25,女,80040,小慧,35,女,5002,小王,45,男,40004,小慧,35,女,5003,小李,25,女,8001,小明,15,男,1500

unionAll

  • 功能描述: 连接两表中的数据,不会去重
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.unionAllimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))    val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)    tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)    /**      *  union 连接两个表,不会去重      */    tableEnv.sqlQuery(      "select * from ("                +"select t1.* FROM `user` as t1 ) " +                + " UNION ALL "                + " ( select t2.* FROM t2 )"       )      .first(100).print()    /**      * 输出结果      *      * 1,小明,15,男,1500      * 2,小王,45,男,4000      * 3,小李,25,女,800      * 4,小慧,35,女,500      * 1,小明,15,男,1500      * 2,小王,45,男,4000      * 30,小李,25,女,800      * 40,小慧,35,女,500      *      */  }}
  • 输出结果
1,小明,15,男,15002,小王,45,男,40003,小李,25,女,8004,小慧,35,女,5001,小明,15,男,15002,小王,45,男,400030,小李,25,女,80040,小慧,35,女,500

INTERSECT

  • 功能描述: INTERSECT 连接两个表,找相同的数据(相交的数据,重叠的数据)
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.intersectimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))    val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)    tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)    /**      *  INTERSECT 连接两个表,找相同的数据(相交的数据,重叠的数据)      */    tableEnv.sqlQuery(      "select * from ("                +"select t1.* FROM `user` as t1 ) " +                + " INTERSECT "                + " ( select t2.* FROM t2 )"       )      .first(100).print()    /**      * 输出结果      *      * 1,小明,15,男,1500      * 2,小王,45,男,4000      *      */  }}
  • 输出结果
1,小明,15,男,1500 2,小王,45,男,4000

in

  • 功能描述: 子查询
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.inimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))    val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)    tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)    /**      *  in ,子查询      */    tableEnv.sqlQuery(                "select t1.* FROM `user` t1  where t1.id in " +                        " (select t2.id from t2) "       )      .first(100).print()    /**      * 输出结果      *      * 1,小明,15,男,1500      * 2,小王,45,男,4000      *      */  }}
  • 输出结果
1,小明,15,男,1500 2,小王,45,男,4000

EXCEPT

  • 功能描述: EXCEPT 连接两个表,找不相同的数据(不相交的数据,不重叠的数据)
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.exceptimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))    val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)    tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)    /**      *  EXCEPT 连接两个表,找不相同的数据(不相交的数据,不重叠的数据)      */    tableEnv.sqlQuery(      "select * from ("                +"select t1.* FROM `user` as t1 ) " +                + " EXCEPT "                + " ( select t2.* FROM t2 )"       )      .first(100).print()    /**      * 输出结果      *      * 3,小李,25,女,800      * 4,小慧,35,女,500      *      */  }}
  • 输出结果
3,小李,25,女,800 4,小慧,35,女,500

DML

insert into

  • 功能描述:将一个表中的数据(source),插入到 csv文件中(sink)
  • scala程序
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.insertimport org.apache.flink.api.scala.typeutils.Typesimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.core.fs.FileSystem.WriteModeimport org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._import org.apache.flink.api.scala._import org.apache.flink.table.sinks.CsvTableSinkimport org.apache.flink.api.common.typeinfo.TypeInformationobject Run {  def main(args: Array[String]): Unit = {    //得到批环境    val env = ExecutionEnvironment.getExecutionEnvironment    val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))    //得到Table环境    val tableEnv = TableEnvironment.getTableEnvironment(env)    //注册table    tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)    // create a TableSink    val csvSink = new CsvTableSink("sink-data/csv/a.csv",",",1,WriteMode.OVERWRITE);    val fieldNames = Array("name", "age", "sex")    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.STRING)    tableEnv.registerTableSink("t2",fieldNames,fieldTypes,csvSink)    tableEnv.sqlUpdate(s" insert into  t2 select name,age,sex FROM user1  ")    env.execute()    /**      * 输出结果      * a.csv      *      * 小明,15,男      * 小王,45,男      * 小李,25,女      * 小慧,35,女      */  }}
  • 输出数据 a.csv
小明,15,男小王,45,男小李,25,女小慧,35,女

Scan

  • 功能描述:
  • scala 程序
  • 输出结果

转载地址:http://wwyqf.baihongyu.com/

你可能感兴趣的文章
使用notePad修改将文件格式保存后不起作用
查看>>
如何查询oracle会话及锁 如何查锁了哪张表?如何杀掉会话
查看>>
Git常用命令速查手册
查看>>
Redis运维利器 -- RedisManager
查看>>
分布式之REDIS复习精讲
查看>>
分布式之数据库和缓存双写一致性方案解析
查看>>
Redis集群
查看>>
Oracle 查看和扩展表空间
查看>>
记一次线上Java程序导致服务器CPU占用率过高的问题排除过程
查看>>
Java 内存溢出(java.lang.OutOfMemoryError)的常见情况和处理方式总结
查看>>
从cpu和内存来理解为什么数组比链表查询快
查看>>
CentOS7下使用YUM安装MySQL5.6
查看>>
JVM内存空间
查看>>
Docker 守护进程+远程连接+安全访问+启动冲突解决办法 (完整收藏版)
查看>>
从零写分布式RPC框架 系列 2.0 (4)使用BeanPostProcessor实现自定义@RpcReference注解注入
查看>>
Java 设计模式 轻读汇总版
查看>>
Paxos学习笔记及图解
查看>>
深入解析Spring使用枚举接收参数和返回值机制并提供自定义最佳实践
查看>>
数据序列化框架——Kryo
查看>>
布隆过滤器(BloomFilter)——应用(三)
查看>>