本文整理汇总了Scala中org.apache.kafka.clients.consumer.OffsetAndMetadata类的典型用法代码示例。如果您正苦于以下问题:Scala OffsetAndMetadata类的具体用法?Scala OffsetAndMetadata怎么用?Scala OffsetAndMetadata使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了OffsetAndMetadata类的1个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: TwitterSinkTask
//设置package包名称以及导入依赖的类
package com.eneco.trading.kafka.connect.twitter
import java.util
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.connect.sink.{SinkRecord, SinkTask}
import scala.collection.JavaConverters._
import scala.util.{Success, Failure}
class TwitterSinkTask extends SinkTask with Logging {
var writer: Option[SimpleTwitterWriter] = None
override def start(props: util.Map[String, String]): Unit = {
val sinkConfig = new TwitterSinkConfig(props)
writer = Some(new TwitterWriter(
sinkConfig.getString(TwitterSinkConfig.CONSUMER_KEY_CONFIG),
sinkConfig.getPassword(TwitterSinkConfig.CONSUMER_SECRET_CONFIG).value,
sinkConfig.getString(TwitterSinkConfig.TOKEN_CONFIG),
sinkConfig.getPassword(TwitterSinkConfig.SECRET_CONFIG).value))
}
override def put(records: util.Collection[SinkRecord]): Unit =
records.asScala
.map(_.value.toString)
.map(text => (text, writer match {
case Some(writer) => writer.updateStatus(text)
case None => Failure(new IllegalStateException("twitter writer is not set"))
}))
.foreach {
case (text, result) => result match {
case Success(id) => log.info(s"successfully tweeted `${text}`; got assigned id ${id}")
case Failure(err) => log.warn(s"tweeting `${text}` failed: ${err.getMessage}")
}
}
override def stop(): Unit = {
}
override def flush(map: util.Map[TopicPartition, OffsetAndMetadata]) = {
}
override def version(): String = ""
}
开发者ID:heroku,项目名称:kafka-tweet-producer,代码行数:43,代码来源:TwitterSinkTask.scala
注:本文中的org.apache.kafka.clients.consumer.OffsetAndMetadata类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论