本文只是以left_join作为举例,right_join,full_join 等是同理的,大家可以自行扩展
1. 实验思路
1.提供两个流
nameStream: 用户名称信息从 9999 端口获取
ageStream: 有用户年龄信息 从9998 端口获取
2. 用nameStream left join ageStream ,合并两个流中的信息,输出结果
2. 方法
1【简单】: 使用 coGroup 来进行left join.
2【复杂】:使用 nameStream join (nameStream.union(ageStream)) 后, 按id 分组,判断分组后记录的个数。
偶数时,代表同一个id,会有俩次匹配,一次是left*left, 一次是 left * right, 返回非自身匹配就可以,也就是 left*right;
奇数时,代表只有一次 left*left 直接返回
3.代码
公用对象:
import java.text.SimpleDateFormat import java.util.Date import com.alibaba.fastjson.JSON import com.alibaba.fastjson.serializer.SerializerFeature import scala.beans.BeanProperty object Const { final val names = List("wang", "li", "zhao", "qian", "sun") final val ages = List(0, 10, 20, 30) final val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") } case class People( @BeanProperty var id: Int = 0, @BeanProperty var name: String = "", @BeanProperty var age: Int = 0, @BeanProperty var eventTime: Long = 0, @BeanProperty var eventTimeStr: String = "" ) { override def toString: String = { JSON.toJSONString(this, SerializerFeature.QuoteFieldNames) } } object People { def newPeopleByName(id: Int, name: String) = { val than = new People() val now = new Date() than.id = id than.name = name than.eventTime = now.getTime than.eventTimeStr = Const.dateFormat.format(now) than } def newPeopleByAge(id: Int, age: Int) = { val than = new People() val now = new Date() than.id = id than.age = age than.eventTime = now.getTime than.eventTimeStr = Const.dateFormat.format(now) than } }