用dbeaver工具连接clickhouse数据库
建表语句:
CREATE TABLE kafka_clickhouse_os.ldy_pageclick(`id` String COMMENT '***'ID',`ldy_type` String COMMENT '***'类型',`ldy_platform` String COMMENT '***',`ldy_id` Int64 COMMENT '***'Id',`app_id` String COMMENT '***'Id',`ip` String COMMENT 'ip***'',`ua` String COMMENT '用户***'',`click_time` DateTime COMMENT '***'时间',`channel_id` Int64 COMMENT '***'ID',`channel_name` String COMMENT '***'名字',`book_id` Int64 COMMENT '***'ID',`user_tag` String COMMENT '***'ID',`book_name` String COMMENT '***'名')ENGINE = MergeTreeORDER BY ldy_idSETTINGS index_granularity = 8192
测试类:
import com.zw.bigdata.yd.land.dwd.AdAndLandStat.jobNameimport com.zw.bigdata.yd.land.kafka.AdConsumerKafkaSchemaimport com.zw.bigdata.yd.land.pojo.{LandPvUvRecord, UniformAdvertRecord}import org.apache.flink.api.scala._import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}import org.apache.flink.streaming.api.functions.sink.SinkFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject AdLandToCH {//从想应的配置文件中获取相应的配置参数val flinkCheckpointPath = EnvConfig.getConfigValue("yd.land.flink.checkout.path")val kafkaServer = EnvConfig.getConfigValue("yd.land.kafka.bootstrap.servers")val kafkaGroupId = EnvConfig.getConfigValue("yd.land.kafka.group.id")val topicAdName = EnvConfig.getConfigValue("yd.land.dws.ad.kafka.topic.name")val topicPvUvName = EnvConfig.getConfigValue("yd.land.dws.puv.kafka.topic.name")val jobName = EnvConfig.getConfigValue("yd.land.job.name")val offsetPosition = EnvConfig.getConfigValue("yd.land.offset.position")val landTableName = EnvConfig.getConfigValue("yd.land.clickhouse.tablename")val adTableName = EnvConfig.getConfigValue("yd.ad.clickhouse.tablename")val driverName = EnvConfig.getConfigValue("yd.clickhouse.driverName")val database = EnvConfig.getConfigValue("yd.clickhouse.databse")val clickhouseIp = EnvConfig.getConfigValue("yd.clickhouse.Ip")val userName = EnvConfig.getConfigValue("yd.clickhouse.userName")val password = EnvConfig.getConfigValue("yd.clickhouse.password")val timeoutKafka = 15 * 60 * 1000val ODS_YD_PREFIX = "dwd_yd_land" def main(args: Array[String]): Unit = { val environment = FlinkExecutionEnvUtil.getStreamEnv(flinkCheckpointPath) landPvUvToCkSink(environment) environment.execute(jobName) } def getStreamEnv(checkpointPath:String) = {val checkpointTime=1val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)//进行模式匹配,加载相配的配置文件envActivate match { case "prod" =>{ env.enableCheckpointing(600000) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) env.getCheckpointConfig.setCheckpointTimeout(500000) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.setStateBackend(new EmbeddedRocksDBStateBackend(true)) env.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage(checkpointPath)) env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.minutes(3))) } case "test" =>{ env.enableCheckpointing(1200000) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) env.getCheckpointConfig.setCheckpointTimeout(1000000) env.getCheckpointConfig.setMaxConcurrentCheckpoints(5) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.setStateBackend(new EmbeddedRocksDBStateBackend(true)) env.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage(checkpointPath)) //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.minutes(1))) } case _=>{ env.enableCheckpointing(60000) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) env.getCheckpointConfig.setCheckpointTimeout(50000) env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.setStateBackend(new EmbeddedRocksDBStateBackend(true)) env.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage(checkpointPath)) env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.minutes(1))) }}env }def landPvUvToCkSink(env:StreamExecutionEnvironment):Unit = {val landPuvConsumer = FlinkKafkaUtil.getFlinkKafkaConsume[LandPvUvRecord](kafkaServer,kafkaGroupId,topicPvUvName,offsetPosition, new AdConsumerKafkaSchema[LandPvUvRecord])landPuvConsumer.setStartFromEarliest()val ydLandPuvDS = env.addSource(landPuvConsumer)val landAdvertSink: SinkFunction[LandPvUvRecord] = JdbcSink.sink( s"""insert into ${database}.${landTableName} (`landDate`,`landId`,`landName`,`landPosition`,`pv`,`uv`,`appName`,`appWx`,`bookId`,`channelBookId`,`bookName`) values(?,?,?,?,?,?,?,?,?,?,?)""", new JdbcStatementBuilder[LandPvUvRecord] { override def accept(ps: PreparedStatement, tp: LandPvUvRecord): Unit = { ps.setString(1, tp.landDate) ps.setString(2, tp.landId) ps.setString(3, tp.landName) ps.setString(4, tp.landPosition) ps.setInt(5, tp.pv) ps.setInt(6, tp.uv) ps.setString(7, tp.appName.getOrElse("")) ps.setString(8, tp.appWx.getOrElse("")) ps.setString(9, tp.bookId.getOrElse("")) ps.setString(10, tp.channelBookId.getOrElse("")) ps.setString(11, tp.bookName.getOrElse("")) } }, new JdbcExecutionOptions.Builder().withBatchSize(2).build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName(s"${driverName}") .withUrl(s"jdbc:clickhouse://${clickhouseIp}:8123/${database}?useUnicode=true&characterEncoding=utf-8&useSSL=false") .withUsername(s"${userName}") .withPassword(s"${password}") .build())ydLandPuvDS.print()ydLandPuvDS.addSink(landAdvertSink) } }
加载配置文件的图片:
样例类:
LandPvUvRecord:
package com.xx.bigdata.yd.land.pojocase class LandPvUvRecord(landDate:String,landId:String,appName:Option[String], appWx:Option[String], bookId:Option[String],channelBookId:Option[String], bookName:Option[String], landName:String, landPosition:String,pv:Int,uv:Int)
删除clickhouse库中的数据:
clickhouse 清空数据:没有截断表,只能 : ALTER TABLE 表名 DELETE WHERe 条件