现在是2020年9月,由于目前还没有Spark整合ClickHouse的连接器,所以通过spark读写ClickHouse的方式只能是jdbc了,另外github上有个连接器,需要自己打包发布,感兴趣的可以研究下,地址https://github.com/wangxiaojing/spark-clickhouse

以下是spark读写clickHouse的代码:

/*
  读取
   */
  def select(spark:SparkSession): Unit ={
    spark.read
      .format("jdbc")
      .option("driver","ru.yandex.clickhouse.ClickHouseDriver")
      .option("url", "jdbc:clickhouse://hadoop:8124/tutorial")
      .option("dbtable", "test")
      .load().show()
    spark.stop()
  }
/*
  写入
   */
  def insert(spark:SparkSession): Unit ={
    //clickhouse客户端配置
    val pro = new java.util.Properties
    pro.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
    //创建数据
    import spark.implicits._
    val df = Seq(Person("yyy",19)).toDS
    //写入clickhouse
    df.write
      .mode(SaveMode.Append)
      .option("batchsize", "20000")
      .option("isolationLevel", "NONE")
      .option("numPartitions", "1")
      .jdbc("jdbc:clickhouse://hadoop:8124/tutorial","test",pro)
    spark.stop()
  }

这里注意如果表中的字段没设置为可为空,那么写入空值会报错,而且错误不易发现

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐