文章出處
文章列表
一 背景
Spark社區為Spark Streaming提供了很多數據源接口,但是有些比較偏的數據源沒有覆蓋,由于公司技術棧選擇,用了阿里云的MQ服務ONS,要做實時需求,要自己編寫Receiver
二 技術實現
1.官網的例子已經比較詳細,但是進入實踐還需要慢慢調試,官方文檔。
2.實現代碼,由三部分組成,receiver,inputstream,util
3.receiver代碼
import java.io.Serializable import java.util.Properties import com.aliyun.openservices.ons.api._ import com.aliyun.openservices.ons.api.impl.ONSFactoryImpl import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver class OnsReceiver( cid: String, accessKey: String, secretKey: String, addr: String, topic: String, tag: String, func: Message => Array[Byte]) extends Receiver[Array[Byte]](StorageLevel.MEMORY_AND_DISK_2) with Serializable with Logging { receiver => private var consumer: Consumer = null private var workerThread: Thread = null override def onStart(): Unit = { workerThread = new Thread(new Runnable { override def run(): Unit = { val properties = new Properties properties.put(PropertyKeyConst.ConsumerId, cid) properties.put(PropertyKeyConst.AccessKey, accessKey) properties.put(PropertyKeyConst.SecretKey, secretKey) properties.put(PropertyKeyConst.ONSAddr, addr) properties.put(PropertyKeyConst.MessageModel, "CLUSTERING") properties.put(PropertyKeyConst.ConsumeThreadNums, "50") val onsFactoryImpl = new ONSFactoryImpl consumer = onsFactoryImpl.createConsumer(properties) consumer.subscribe(topic, tag, new MessageListener() { override def consume(message: Message, context: ConsumeContext): Action = { try { receiver.store(func(message)) Action.CommitMessage } catch { case e: Throwable => e.printStackTrace() Action.ReconsumeLater } } }) consumer.start() } }) workerThread.setName(s"Aliyun ONS Receiver $streamId") workerThread.setDaemon(true) workerThread.start() } override def onStop(): Unit = { if (workerThread != null) { if (consumer != null) { consumer.shutdown() } workerThread.join() workerThread = null logInfo(s"Stopped receiver for streamId $streamId") } } }
input代碼
import com.aliyun.openservices.ons.api.Message import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.receiver.Receiver class OnsInputDStream( @transient _ssc: StreamingContext, cid: String, topic: String, tag: String, accessKey: String, secretKey: String, addr:String, func: Message => Array[Byte] ) extends ReceiverInputDStream[Array[Byte]](_ssc) { override def getReceiver(): Receiver[Array[Byte]] = { new OnsReceiver(cid,accessKey,secretKey,addr,topic,tag,func) } }
util代碼
import com.aliyun.openservices.ons.api.Message import org.apache.spark.annotation.Experimental import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object OnsUtils { @Experimental def createStream( ssc: StreamingContext, cid: String, topic: String, tag: String, accessKey: String, secretKey: String, addr: String, func: Message => Array[Byte]): ReceiverInputDStream[Array[Byte]] = { new OnsInputDStream(ssc, cid, topic, tag, accessKey, secretKey, addr, func) } @Experimental def createStreams( ssc: StreamingContext, consumerIdTopicTags: Array[(String, String, String)], accessKey: String, secretKey: String, addr: String, func: Message => Array[Byte]): DStream[Array[Byte]] = { val invalidTuples1 = consumerIdTopicTags.groupBy(e => (e._1, e._2)).filter(e => e._2.length > 1) val invalidTuples2 = consumerIdTopicTags.map(e => (e._1, e._2)).groupBy(e => e._1).filter(e => e._2.length > 1) if (invalidTuples1.size > 1 || invalidTuples2.size > 1) { throw new RuntimeException("Inconsistent consumer subscription.") } else { ssc.union(consumerIdTopicTags.map({ case (consumerId, topic, tags) => createStream(ssc, consumerId, topic, tags, accessKey, secretKey, addr, func) })) } } }
三 調用
val stream = (0 until 3).map(i => { OnsUtils.createStream(ssc, "CID", "BI_CALL", "call_log_ons", config.getString("ons.access_key"), config.getString("ons.sercet_key"), config.getString("ons.ons_addr"), func) }) val unionStream = ssc.union(stream).foreachRDD(...)
stream可以決定設置多少個receiver,這個數量必須小于等于spark on yarn的num-executors,內存默認占用executors的內存的一半。
文章列表
全站熱搜