1.知识点
- scala输入输出样例类
- keyBy并行度为1盘算UV的技巧
map(data => ("uv", data.userId))..keyBy(_._1)
自界说MapFunction,随机自界说key+"uv"
Random.nextString(10) + "uv"
- WindowedStream.trigger的使用
trigger触发器,每来一条数据直接清空窗口,放到redis进行盘算
- trigger返回WindowedStream,继续调用process(ProcessWindowFunction)
- WindowedStream.process()的使用
windowStream调用接口
- 布隆过滤器的实现
2.业务目的
滚动输出最近1小时内的PV
窗口:1小时
指标:点击量
3.流程心法
总流程:创建输入输出类--->实验环境--->transform转换--->各类窗口函数的调用
主Object:
1.创建实验环境,设置时间语义,并行度等
2.transform api map转换为输入样例类,并设置watermark
3.key 界说成常量"v",那么keyBy就分为同一组,如果并行则可以自界说mapFunction
4.实现trigger
5.实现processWindowFunction
4.模块详解
4.1 创建输入输出样例类
4.2 主object实现
4.2.1 创建实验环境并添加数据源
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 从文件中读取数据 val resource = getClass.getResource("/UserBehavior.csv") val inputStream: DataStream[String] = env.readTextFile(resource.getPath)4.2.2 Datastream map转换为输入样例类
// 转换成样例类类型并提取时间戳和watermark val dataStream: DataStream[UserBehavior] = inputStream .map(data => { val arr = data.split(",") UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong) }) .assignAscendingTimestamps(_.timestamp * 1000L)4.2.3 处置惩罚逻辑(1)----filter类型,timeWindow
val uvStream = dataStream .filter(_.behavior == "pv") .map( data => ("uv", data.userId) ) //如果要并行,并行自界说mymapper .keyBy(_._1) .timeWindow(Time.hours(1)) //滚动窗口 .trigger(new MyTrigger()) //trigger触发器,每来一条数据直接清空窗口,放到redis盘算。 .process( new UvCountWithBloom() )4.2.4 处置惩罚逻辑(2)----Trigger实现
class MyTrigger() extends Trigger[(String,Long),TimeWindow]{ override def onElement(t: (String, Long), l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { TriggerResult.FIRE_AND_PURGE } //体系时间有希望时做什么操作 override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE //watermark改变做什么操作 override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = { }}4.2.5 处置惩罚逻辑(2)----ProcessWindowFunction实现
1.界说redis中存储位图的key ,本例为窗口竣事时间
2.界说一个redis hash表,保存统计之后的每个窗口竣事时间的uv count.
表名:uvcount
KEY: 窗口竣事时间
VALUE:uv count值
3. 对userid进行hash, 从位图中查看hash后的偏移量是否窜在,若存在则uvcount不操作。若不存在则uvcount+1,位图也相应更新
class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow]{ // 界说redis连接以及布隆过滤器 lazy val jedis = new Jedis("localhost", 6379) lazy val bloomFilter = new Bloom(1<<29) // 2的29次方,1左移29位。 位的个数:2^6(64) * 2^20(1M) * 2^3(8bit) ,64MB // 本来是收集齐备部数据、窗口触发盘算的时候才会调用;如今每来一条数据都调用一次 override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = { // 先界说redis中存储位图的key val storedBitMapKey = context.window.getEnd.toString //别的将当前窗口的uv count值,作为状态保存到redis里,用一个叫做uvcount的hash表来保存(windowEnd,count) val uvCountMap = "uvcount" val currentKey = context.window.getEnd.toString var count = 0L // 从redis中取出当前窗口的uv count值 if(jedis.hget(uvCountMap, currentKey) != null) count = jedis.hget(uvCountMap, currentKey).toLong // 去重:判定当前userId的hash值对应的位图位置,是否为0 val userId = elements.last._2.toString // 盘算hash值,就对应着位图中的偏移量 val offset = bloomFilter.hash(userId, 61) val isExist = jedis.getbit(storedBitMapKey, offset) if(!isExist){ // 如果不存在,那么位图对应位置置1,并且将count值加1 jedis.setbit(storedBitMapKey, offset, true) jedis.hset(uvCountMap, currentKey, (count + 1).toString) } }}4.2.6 处置惩罚逻辑(3)----布隆过滤器实现
也可以调用外部google等现成的布隆过滤器.
计划布隆过滤器的要点:
1.选好点的hash函数
2.不同userid经过hash到同一位上。不要那么稠密。
即1亿的user,我们给出2亿的位,出现碰撞的概率就特别小。
10B * 1亿,大概1GB, 用位来存,1bit * 1亿 大概10m,放redis放内存都是个很好的 选择。
纵然我们扩大位防止碰撞,放6亿,也是68M,可以放到redis中。有大概出现hash碰撞
class Bloom(size: Long) extends Serializable{ private val cap = size // 默认cap应该是2的整次幂 //hash函数 value即userid,seed随机数种子 def hash(value: String, seed: Int): Long = { var result = 0 //遍历userid,对每一位进行随机数种子的处置惩罚 for( i <- 0 until value.length ){ result = result * seed + value.charAt(i) } // 返回hash值,要映射到cap范围内 (cap - 1) & result }}4.3 完备代码
|