Categories
Uncategorized

Real-time statistics every day pv, uv combination of sparkStreaming redis result is stored in mysql front end for display

Recently, a demand, real-time statistics pv, uv, according to the results of date, hour, pv, uv to show, by day count, recount the next day, of course, the actual needs in accordance with the type of field statistics of pv, uv, for example in accordance with the date , hour, pv, uv, type to show. Introduce basic pv, uv show here.

id uv pv date hour
1 155599 306053 2018-07-27 18

About what is pv, uv, you can see this blog: https: //blog.csdn.net/petermsh/article/details/78652246

1, Project Flow

Data collected from the log flume over other offline fall hdfs for operational use, will sink to kafka, sparkStreaming kafka pulled up from the data, calculate pv, uv, uv is set with a set of redis deduplication, and finally writes the result mysql database front-end display for use.

2, the specific process

1) Calculation of pv

There are two ways to pull data, and based on the received direct mode, direct herein by Czochralski manner using the saved state mapWithState operator, and the operator as updateStateByKey, and better performance. Of course, the actual data needs cleaned up, filtration, can be used.

Define a state function

// 实时流量状态更新函数
  val mapFunction = (datehour:String, pv:Option[Long], state:State[Long]) => {
    val accuSum = pv.getOrElse(0L) + state.getOption().getOrElse(0L)
    val output = (datehour,accuSum)
    state.update(accuSum)
    output
  }
 计算pv
 val stateSpec = StateSpec.function(mapFunction)
 val helper_count_all = helper_data.map(x => (x._1,1L)).mapWithState(stateSpec).stateSnapshots().repartition(2)

So it is easy to pv calculated.

2) Calculation of uv

uv to go heavy all day, every time a batch of data comes in, if reduceByKey native or groupByKey configuration requirements are too high, in the low configuration, we applied for a 93G redis used to de-emphasis, principles each piece of data is coming in, the date as a key, guid set to join the collection, 20 seconds refresh time, that is, the size set out to take the set, the database can be updated look.

helper_data.foreachRDD(rdd => {
        rdd.foreachPartition(eachPartition => {
        // 获取redis连接
          val jedis = getJedis
          eachPartition.foreach(x => {
            val date:String = x._1.split(":")(0)
            val key = date
            // 将date作为key,guid(x._2)加入set集合
            jedis.sadd(key,x._2)
            // 设置存储每天的数据的set过期时间,防止超过redis容量,这样每天的set集合,定期会被自动删除
            jedis.expire(key,ConfigFactory.rediskeyexists)
          })
          // 关闭连接
          closeJedis(jedis)
        })
      })

3) results are saved to the database

Save the results to mysql, database, 20 seconds to refresh the database front-end display refresh, it will re-query the database once, do real-time statistics show pv, uv’s purpose.

/**
    * 插入数据
    * @param data (addTab(datehour)+helperversion)
    * @param tbName
    * @param colNames
    */
  def insertHelper(data: DStream[(String, Long)], tbName: String, colNames: String*): Unit = {
    data.foreachRDD(rdd => {
      val tmp_rdd = rdd.map(x => x._1.substring(11, 13).toInt)
      if (!rdd.isEmpty()) {
        val hour_now = tmp_rdd.max() // 获取当前结果中最大的时间,在数据恢复中可以起作用
        rdd.foreachPartition(eachPartition => {
          try {
            val jedis = getJedis
            val conn = MysqlPoolUtil.getConnection()
            conn.setAutoCommit(false)
            val stmt = conn.createStatement()
            eachPartition.foreach(x => {
              val datehour = x._1.split("\t")(0)
              val helperversion = x._1.split("\t")(1)
              val date_hour = datehour.split(":")
              val date = date_hour(0)
              val hour = date_hour(1).toInt

              val colName0 = colNames(0) // date
              val colName1 = colNames(1) // hour
              val colName2 = colNames(2) // count_all
              val colName3 = colNames(3) // count
              val colName4 = colNames(4) // helperversion
              val colName5 = colNames(5) // datehour
              val colName6 = colNames(6) // dh

              val colValue0 = addYin(date)
              val colValue1 = hour
              val colValue2 = x._2.toInt
              val colValue3 = jedis.scard(date + "_" + helperversion) // // 2018-07-08_10.0.1.22
              val colValue4 = addYin(helperversion)
              var colValue5 = if (hour < 10) "'" + date + " 0" + hour + ":00 " + helperversion + "'" else "'" + date + " " + hour + ":00 " + helperversion + "'"
              val colValue6 = if(hour < 10) "'" + date + " 0" + hour + ":00'" else "'" + date + " " + hour + ":00'"

              var sql = ""
              if (hour == hour_now) { // uv只对现在更新
                sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4},${colName5}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4},${colValue5}) on duplicate key update ${colName2} =  ${colValue2},${colName3} = ${colValue3}"
              } else {
                sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName4},${colName5}) values(${colValue0},${colValue1},${colValue2},${colValue4},${colValue5}) on duplicate key update ${colName2} =  ${colValue2}"
              }
              stmt.addBatch(sql)
            })
            closeJedis(jedis)
            stmt.executeBatch() // 批量执行sql语句
            conn.commit()
            conn.close()
          } catch {
            case e: Exception => {
              logger.error(e)
              logger2.error(HelperHandle.getClass.getSimpleName + e)
            }
          }
        })
      }
    })
  }
  
// 计算当前时间距离次日零点的时长(毫秒)
def resetTime = {
    val now = new Date()
    val todayEnd = Calendar.getInstance
    todayEnd.set(Calendar.HOUR_OF_DAY, 23) // Calendar.HOUR 12小时制
    todayEnd.set(Calendar.MINUTE, 59)
    todayEnd.set(Calendar.SECOND, 59)
    todayEnd.set(Calendar.MILLISECOND, 999)
    todayEnd.getTimeInMillis - now.getTime
 }

4) Data Fault Tolerance

Consumer kafka stream processing will take into account the data loss problem, generally can be saved to any storage system, including mysql, hdfs, hbase, redis, zookeeper wait. Data recovery when used SparkStreaming here comes checkpoint mechanism to achieve application restart.

checkpoint

Checkpoint mechanism is used herein, can read the last restart task without restart or directly after a failure, the corresponding offset data is read from kafka.

// 初始化配置文件
ConfigFactory.initConfig()

val conf = new SparkConf().setAppName(ConfigFactory.sparkstreamname)
conf.set("spark.streaming.stopGracefullyOnShutdown","true")
conf.set("spark.streaming.kafka.maxRatePerPartition",consumeRate)
conf.set("spark.default.parallelism","24")
val sc = new SparkContext(conf)

while (true){
    val ssc = StreamingContext.getOrCreate(ConfigFactory.checkpointdir + DateUtil.getDay(0),getStreamingContext _ )
    ssc.start()
    ssc.awaitTerminationOrTimeout(resetTime)
    ssc.stop(false,true)
}

checkpoint every day is a directory, the next morning the timing of the destruction StreamingContext objects recount calculate pv, uv.

note

ssc.stop (false, true) represents gracefully destroy StreamingContext object that can not be destroyed SparkContext objects, ssc.stop (true, true) will be stopped SparkContext object program directly stopped.

Application migration or upgrade program

In this process, we use a little upgrade, for example, wrote a feature is not perfect, or logical error, this time is the need to modify the code, re-jar packaging, this time if the program stopped, new the application will still read the old checkpoint, there may be two problems:

    Or the last program to be executed, because the checkpoint and there was a sequence of codes;

    Direct execution fails, failure deserialization;

In fact, sometimes, do not delete the checkpoint after modify the code can also be directly entered into force, after many tests, I found that if the data filtering operation results in data filtering logic changes, as well as state of the operation to save the changes, can lead to failure to restart, it only deletes checkpoint OK, but in practice once deleted checkpoint, it will lead once on the unfinished task and offset the loss of consumer kafka directly result in data loss, in which case I usually do.

This is usually in another cluster, or the next checkpoint directory changes, we are separated from the code and configuration files, so modify the configuration file checkpoint location is very convenient. Then run two programs together, except checkpoint directory is not the same, will be re-built, are inserted into the same database, after running for some time, the old program stopped just fine. Tell me what to say before the network can only remember not clear and will only think about ways to make their own to ensure data accuracy.

5) log

Logs used log4j2, save a copy locally, ERROR level log will be sent to the phone via e-mail.

val logger = LogManager.getLogger(HelperHandle.getClass.getSimpleName)
  // 邮件level=error日志
  val logger2 = LogManager.getLogger("email")

A large share of artificial intelligence tutorial God. Zero-based! Easy to understand! Humorous! Also with a yellow piece! I hope you have added to the artificial intelligence ranks!

Click Browse Tutorial

Micro-channel public number

我的微信公众号,专注于大数据分析与挖掘,感兴趣可以关注,看一看,瞧一瞧!

Leave a Reply