• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala RecordMetadata类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.kafka.clients.producer.RecordMetadata的典型用法代码示例。如果您正苦于以下问题:Scala RecordMetadata类的具体用法?Scala RecordMetadata怎么用?Scala RecordMetadata使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了RecordMetadata类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: sendToKafkaWithNewProducer

//设置package包名称以及导入依赖的类
package pl.touk.nussknacker.engine.kafka

import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}

trait EspSimpleKafkaProducer {
  val kafkaConfig: KafkaConfig

  def sendToKafkaWithNewProducer(topic: String, key: Array[Byte], value: Array[Byte]): Future[RecordMetadata] = {
    var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
    try {
      producer = createProducer()
      sendToKafka(topic, key, value)(producer)
    } finally {
      if (producer != null) {
        producer.close()
      }
    }
  }

  //method with such signature already exists in "net.cakesolutions" %% "scala-kafka-client" % "0.9.0.0" but I struggled to add this dependency...
  def sendToKafka(topic: String, key: Array[Byte], value: Array[Byte])(producer: KafkaProducer[Array[Byte], Array[Byte]]): Future[RecordMetadata] = {
    val promise = Promise[RecordMetadata]()
    producer.send(new ProducerRecord(topic, key, value), producerCallback(promise))
    promise.future
  }

  def createProducer(): KafkaProducer[Array[Byte], Array[Byte]] = {
    new KafkaProducer[Array[Byte], Array[Byte]](KafkaEspUtils.toProducerProperties(kafkaConfig))
  }

  private def producerCallback(promise: Promise[RecordMetadata]): Callback =
    new Callback {
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
        val result = if (exception == null) Success(metadata) else Failure(exception)
        promise.complete(result)
      }
    }
} 
开发者ID:TouK,项目名称:nussknacker,代码行数:41,代码来源:EspSimpleKafkaProducer.scala


示例2: KafkaFeedsExporter

//设置package包名称以及导入依赖的类
package ru.fediq.scrapingkit.backend

import cakesolutions.kafka.KafkaProducer
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer
import ru.fediq.scrapingkit.scraper.ScrapedEntity

import scala.concurrent.Future

class KafkaFeedsExporter(
  val bootstrapServer: String,
  val topic: String
) extends FeedExporter {
  val producer = KafkaProducer(KafkaProducer.Conf(new StringSerializer(), new StringSerializer, bootstrapServer))

  override def store[T <: ScrapedEntity](entity: T): Future[RecordMetadata] = {
    producer.send(new ProducerRecord(topic, entity.dump))
  }

  override def close() = producer.close()
} 
开发者ID:fediq,项目名称:scraping-kit,代码行数:22,代码来源:KafkaFeedsExporter.scala


示例3: RecordCallback

//设置package包名称以及导入依赖的类
package articlestreamer.kafka

import org.apache.kafka.clients.producer.{Callback, RecordMetadata}

class RecordCallback extends Callback {

  override def onCompletion(metadata: RecordMetadata, ex: Exception) = {
    if (ex != null) {
      handleException(ex)
    } else {
      println(s"Successfully sent message : $metadata")
    }
  }
  
  def handleException(exception: Exception): Unit = {
    Console.err.println(s"Error while attempting to send message : $exception")
  }
} 
开发者ID:firens,项目名称:article-streamer-aggregator,代码行数:19,代码来源:RecordCallback.scala


示例4: KProducer

//设置package包名称以及导入依赖的类
package org.parsec

import java.util.Properties
import java.util.concurrent.Future

import com.sksamuel.avro4s.{FromRecord, RecordFormat, ToRecord}
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}

// Convenient Kafka producer using avro4s
class KProducer[K <: Product, V <: Product] {

  val kafkaProps = new Properties()
  kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
  kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
  kafkaProps.put("schema.registry.url", "http://localhost:8081")
  private lazy val producer  = new KafkaProducer[GenericRecord, GenericRecord](kafkaProps)

  def produce(topic: String, key: K, value: V, partition: Int = 0)(implicit toRecordKey: ToRecord[K], fromRecordKey: FromRecord[K], toRecord: ToRecord[V], fromRecord: FromRecord[V]): Future[RecordMetadata] = {
    val keyRec = RecordFormat[K].to(key)
    val valueRec = RecordFormat[V].to(value)
    val data: ProducerRecord[GenericRecord, GenericRecord] = new ProducerRecord(topic, partition, keyRec, valueRec)
    producer.send(data)
  }

} 
开发者ID:cryptocurrencyindia,项目名称:Parsec,代码行数:29,代码来源:KProducer.scala


示例5: FailingKafkaStorage

//设置package包名称以及导入依赖的类
package io.amient.affinity.testutil.storage

import java.nio.ByteBuffer
import java.util.concurrent.{Callable, Executors, Future}

import com.typesafe.config.Config
import io.amient.affinity.core.storage.kafka.KafkaStorage
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}


class FailingKafkaStorage(config: Config, partition: Int) extends KafkaStorage(config, partition) {

  val executor = Executors.newFixedThreadPool(1)

  override def write(key: ByteBuffer, value: ByteBuffer): Future[RecordMetadata] = {

    val javaFuture: Future[RecordMetadata] = kafkaProducer.send(new ProducerRecord(topic, partition, key, value))
    return executor.submit(new Callable[RecordMetadata]() {
      override def call(): RecordMetadata = {
        if (System.currentTimeMillis() % 10 == 0) {
          throw new RuntimeException("Simulated Exception in FailingKafkaStorage")
        } else {
          javaFuture.get
        }
      }
    })
  }

} 
开发者ID:amient,项目名称:affinity,代码行数:30,代码来源:FailingKafkaStorage.scala


示例6: IssueOrchestratedEmail

//设置package包名称以及导入依赖的类
package com.ovoenergy.orchestration.kafka

import java.util.UUID

import com.ovoenergy.comms.model.email.OrchestratedEmailV3
import com.ovoenergy.comms.model._
import com.ovoenergy.orchestration.domain.EmailAddress
import org.apache.kafka.clients.producer.RecordMetadata

import scala.concurrent.Future

class IssueOrchestratedEmail(sendEvent: OrchestratedEmailV3 => Future[RecordMetadata])
    extends IssueOrchestratedComm[EmailAddress] {

  def send(customerProfile: Option[CustomerProfile],
           emailAddress: EmailAddress,
           triggered: TriggeredV3): Future[RecordMetadata] = {
    val orchestratedEmailEvent = OrchestratedEmailV3(
      metadata = MetadataV2.fromSourceMetadata(
        source = "orchestration",
        sourceMetadata = triggered.metadata
      ),
      recipientEmailAddress = emailAddress.address,
      templateData = triggered.templateData,
      internalMetadata = InternalMetadata(UUID.randomUUID.toString),
      expireAt = triggered.expireAt,
      customerProfile = customerProfile
    )

    sendEvent(orchestratedEmailEvent)
  }

} 
开发者ID:ovotech,项目名称:comms-orchestration,代码行数:34,代码来源:IssueOrchestratedEmail.scala


示例7: IssueOrchestratedSMS

//设置package包名称以及导入依赖的类
package com.ovoenergy.orchestration.kafka

import java.util.UUID

import com.ovoenergy.comms.model.sms.OrchestratedSMSV2
import com.ovoenergy.comms.model._
import com.ovoenergy.orchestration.domain.MobilePhoneNumber
import org.apache.kafka.clients.producer.RecordMetadata

import scala.concurrent.Future

class IssueOrchestratedSMS(sendEvent: OrchestratedSMSV2 => Future[RecordMetadata])
    extends IssueOrchestratedComm[MobilePhoneNumber] {

  def send(customerProfile: Option[CustomerProfile], mobileNumber: MobilePhoneNumber, triggered: TriggeredV3) = {
    val orchestratedSMSEvent = OrchestratedSMSV2(
      metadata = MetadataV2.fromSourceMetadata("orchestration", triggered.metadata),
      customerProfile = customerProfile,
      templateData = triggered.templateData,
      internalMetadata = InternalMetadata(UUID.randomUUID.toString),
      expireAt = triggered.expireAt,
      recipientPhoneNumber = mobileNumber.number
    )
    sendEvent(orchestratedSMSEvent)
  }
} 
开发者ID:ovotech,项目名称:comms-orchestration,代码行数:27,代码来源:IssueOrchestratedSMS.scala


示例8: SimpleProducer

//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils.kafka

import akka.NotUsed
import akka.kafka.ProducerMessage.Result
import akka.kafka.{ProducerMessage, ProducerSettings}
import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata}

import scala.concurrent.{Future, Promise}

class SimpleProducer[K, V](producerSettings: ProducerSettings[K, V]) {
  private val producer = producerSettings.createKafkaProducer()

  def send(record: ProducerRecord[K, V]): Future[Result[K, V, NotUsed]] = {
    val promise = Promise[Result[K, V, NotUsed]]
    val callback = new Callback {
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
        Option(exception) match {
          case Some(err) => promise.failure(err)
          case None => promise.success(Result(metadata, ProducerMessage.Message(record, NotUsed)))
        }
      }
    }

    producer.send(record, callback)
    promise.future
  }
} 
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:28,代码来源:SimpleProducer.scala


示例9: KafkaLogSink

//设置package包名称以及导入依赖的类
package io.neons.collector.infrastructure.log.sink

import java.util.UUID

import com.google.inject.Inject
import io.neons.collector.application.config.CollectorConfig
import io.neons.collector.model.log.{Log, LogSink}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

import scala.concurrent.{Future, Promise}

class KafkaLogSink @Inject()(kafkaProducer: KafkaProducer[UUID, Log], collectorConfig: CollectorConfig) extends LogSink {
  def sendToSink(log: Log): Future[String] = {
    val promise = Promise[String]()

    kafkaProducer.send(
      new ProducerRecord[UUID, Log](collectorConfig.sink.kafkaSinkConfig.topic, UUID.fromString(log.requestUuidL), log),
      (md: RecordMetadata, e: Exception) => {
        Option(md) match {
          case Some(x) => promise.success(x.toString)
          case None =>promise.failure(e)
        }
      }
    )

    promise.future
  }
} 
开发者ID:NeonsIo,项目名称:collector,代码行数:29,代码来源:KafkaLogSink.scala


示例10: FailedEvent

//设置package包名称以及导入依赖的类
package com.ovoenergy.delivery.service.kafka.process

import com.ovoenergy.comms.model.email.ComposedEmailV2
import com.ovoenergy.comms.model.sms.ComposedSMSV2
import com.ovoenergy.comms.model.{Failed, FailedV2, MetadataV2}
import com.ovoenergy.delivery.service.domain.DeliveryError
import com.ovoenergy.delivery.service.logging.LoggingWithMDC
import org.apache.kafka.clients.producer.RecordMetadata

import scala.concurrent.{ExecutionContext, Future}

object FailedEvent extends LoggingWithMDC {

  def email(publishEvent: FailedV2 => Future[RecordMetadata])(
      composedEvent: ComposedEmailV2,
      deliveryError: DeliveryError)(implicit ec: ExecutionContext): Future[RecordMetadata] = {
    val event = FailedV2(
      metadata = MetadataV2.fromSourceMetadata("delivery-service", composedEvent.metadata),
      internalMetadata = composedEvent.internalMetadata,
      reason = deliveryError.description,
      errorCode = deliveryError.errorCode
    )

    publishEvent(event).map(record => {
      logInfo(event,
              s"Publishing Failed event: ${event.errorCode} - ${event.reason} - ${record.partition}/${record.offset}")
      record
    })
  }

  def sms(publishEvent: FailedV2 => Future[RecordMetadata])(
      composedEvent: ComposedSMSV2,
      deliveryError: DeliveryError)(implicit ec: ExecutionContext): Future[RecordMetadata] = {
    val event = FailedV2(
      metadata = MetadataV2.fromSourceMetadata("delivery-service", composedEvent.metadata),
      internalMetadata = composedEvent.internalMetadata,
      reason = deliveryError.description,
      errorCode = deliveryError.errorCode
    )

    publishEvent(event).map(record => {
      logInfo(event,
              s"Publishing Failed event: ${event.errorCode} - ${event.reason} - ${record.partition}/${record.offset}")
      record
    })
  }

} 
开发者ID:ovotech,项目名称:comms-delivery-service,代码行数:49,代码来源:FailedEvent.scala


示例11: IssuedForDeliveryEvent

//设置package包名称以及导入依赖的类
package com.ovoenergy.delivery.service.kafka.process

import com.ovoenergy.comms.model.email.ComposedEmailV2
import com.ovoenergy.comms.model._
import com.ovoenergy.comms.model.sms.ComposedSMSV2
import com.ovoenergy.delivery.service.domain.GatewayComm
import com.ovoenergy.delivery.service.logging.LoggingWithMDC
import org.apache.kafka.clients.producer.RecordMetadata

import scala.concurrent.{ExecutionContext, Future}

object IssuedForDeliveryEvent extends LoggingWithMDC {

  def email(publishEvent: IssuedForDeliveryV2 => Future[RecordMetadata])(
      composedEvent: ComposedEmailV2,
      gatewayComm: GatewayComm)(implicit ec: ExecutionContext): Future[RecordMetadata] = {
    val event = IssuedForDeliveryV2(
      metadata = MetadataV2.fromSourceMetadata("delivery-service", composedEvent.metadata),
      internalMetadata = composedEvent.internalMetadata,
      channel = gatewayComm.channel,
      gateway = gatewayComm.gateway,
      gatewayMessageId = gatewayComm.id
    )

    publishEvent(event).map(record => {
      logInfo(
        event,
        s"Published IssuedForDelivery event: ${event.gateway} - ${event.gatewayMessageId} - ${record.partition}/${record.offset}")
      record
    })
  }

  def sms(publishEvent: IssuedForDeliveryV2 => Future[RecordMetadata])(
      composedEvent: ComposedSMSV2,
      gatewayComm: GatewayComm)(implicit ec: ExecutionContext): Future[RecordMetadata] = {
    val event = IssuedForDeliveryV2(
      metadata = MetadataV2.fromSourceMetadata("delivery-service", composedEvent.metadata),
      internalMetadata = composedEvent.internalMetadata,
      channel = gatewayComm.channel,
      gateway = gatewayComm.gateway,
      gatewayMessageId = gatewayComm.id
    )

    publishEvent(event).map(record => {
      logInfo(
        event,
        s"Published IssuedForDelivery event: ${event.gateway} - ${event.gatewayMessageId} - ${record.partition}/${record.offset}")
      record
    })
  }

} 
开发者ID:ovotech,项目名称:comms-delivery-service,代码行数:53,代码来源:IssuedForDeliveryEvent.scala


示例12: FailedEventSpec

//设置package包名称以及导入依赖的类
package com.ovoenergy.delivery.service.kafka.process

import java.time.Clock

import com.ovoenergy.comms.model._
import com.ovoenergy.comms.model.email.ComposedEmailV2
import com.ovoenergy.delivery.service.domain._
import com.ovoenergy.delivery.service.util.ArbGenerator
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
import org.scalacheck.Shapeless._
import org.scalatest.prop.GeneratorDrivenPropertyChecks
import org.scalatest.{FlatSpec, Matchers}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

class FailedEventSpec extends FlatSpec with Matchers with ArbGenerator with GeneratorDrivenPropertyChecks {

  private implicit val clock = Clock.systemUTC()

  private val composedEmail        = generate[ComposedEmailV2]
  private var failedEventPublished = Option.empty[FailedV2]
  private val publishEvent = (failed: FailedV2) => {
    failedEventPublished = Some(failed)
    Future.successful(new RecordMetadata(new TopicPartition("", 1), 1l, 1l, 1l, java.lang.Long.valueOf(1), 1, 1))
  }

  "FailedEvent" should "process failed email" in {
    FailedEvent.email(publishEvent)(composedEmail, APIGatewayUnspecifiedError(EmailGatewayError))
    failedEventPublished.get.metadata.traceToken shouldBe composedEmail.metadata.traceToken
    failedEventPublished.get.metadata.source shouldBe "delivery-service"
    failedEventPublished.get.errorCode shouldBe APIGatewayUnspecifiedError(EmailGatewayError).errorCode
    failedEventPublished.get.reason shouldBe APIGatewayUnspecifiedError(EmailGatewayError).description
    failedEventPublished.get.internalMetadata shouldBe composedEmail.internalMetadata
  }

} 
开发者ID:ovotech,项目名称:comms-delivery-service,代码行数:39,代码来源:FailedEventSpec.scala


示例13: IssuedForDeliveryEventSpec

//设置package包名称以及导入依赖的类
package com.ovoenergy.delivery.service.kafka.process

import java.time.Clock

import com.ovoenergy.comms.model.IssuedForDeliveryV2
import com.ovoenergy.comms.model.email.ComposedEmailV2
import com.ovoenergy.delivery.service.domain.GatewayComm
import com.ovoenergy.delivery.service.util.ArbGenerator
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
import org.scalacheck.Shapeless._
import org.scalatest.prop.GeneratorDrivenPropertyChecks
import org.scalatest.{FlatSpec, Matchers}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

class IssuedForDeliveryEventSpec extends FlatSpec with Matchers with ArbGenerator with GeneratorDrivenPropertyChecks {

  private implicit val clock = Clock.systemUTC()

  private val gatewayComm                     = generate[GatewayComm]
  private val composedEmail                   = generate[ComposedEmailV2]
  private var issuedForDeliveryEventPublished = Option.empty[IssuedForDeliveryV2]
  private val publishEvent = (issuedForDelivery: IssuedForDeliveryV2) => {
    issuedForDeliveryEventPublished = Some(issuedForDelivery)
    Future.successful(new RecordMetadata(new TopicPartition("", 1), 1l, 1l, 1l, 1l, 1, 1))
  }

  "IssuedForDeliveryEvent" should "process an issued email" in {
    IssuedForDeliveryEvent.email(publishEvent)(composedEmail, gatewayComm)
    issuedForDeliveryEventPublished.get.metadata.traceToken shouldBe composedEmail.metadata.traceToken
    issuedForDeliveryEventPublished.get.metadata.source shouldBe "delivery-service"
    issuedForDeliveryEventPublished.get.gatewayMessageId shouldBe gatewayComm.id
    issuedForDeliveryEventPublished.get.gateway shouldBe gatewayComm.gateway
    issuedForDeliveryEventPublished.get.internalMetadata shouldBe composedEmail.internalMetadata
    issuedForDeliveryEventPublished.get.channel shouldBe gatewayComm.channel
  }

} 
开发者ID:ovotech,项目名称:comms-delivery-service,代码行数:41,代码来源:IssuedForDeliveryEventSpec.scala


示例14: Bidder

//设置package包名称以及导入依赖的类
package bidding.client.console

import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}

class Bidder(producer: KafkaProducer[String, String], itemId: String, startPrice: BigDecimal) {
  private var lastPrice: BigDecimal = startPrice

  private val callback = new Callback {
    override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
      println("published: " + metadata.toString)
    }
  }

  def bid(): Unit = {
    val producerRecord = new ProducerRecord("my-replicated-topic", itemId, lastPrice.toString())
    producer.send(producerRecord, callback)
    lastPrice += BigDecimal(scala.util.Random.nextDouble * 2)
  }
} 
开发者ID:oleksandr-iskhakov,项目名称:bidding-server,代码行数:20,代码来源:Bidder.scala


示例15: RecordCallback

//设置package包名称以及导入依赖的类
package articlestreamer.shared.kafka

import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.producer.{Callback, RecordMetadata}

class RecordCallback extends Callback with LazyLogging {

  override def onCompletion(metadata: RecordMetadata, ex: Exception) = {
    if (ex != null) {
      handleException(ex)
    } else {
      logger.info(s"Successfully sent message : $metadata")
    }
  }
  
  private def handleException(exception: Exception): Unit = {
    logger.error("Error while attempting to send message", exception)
  }
} 
开发者ID:firens,项目名称:article-streamer,代码行数:20,代码来源:RecordCallback.scala


示例16: RecordCallbackSpec

//设置package包名称以及导入依赖的类
package articlestreamer.aggregator.kafka

import articlestreamer.shared.BaseSpec
import articlestreamer.shared.kafka.RecordCallback
import com.github.ghik.silencer.silent
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition


class RecordCallbackSpec extends BaseSpec {

  val recordCallback = new RecordCallback

  //At the moment purely for coverage purpose
  "If any exception occurs" should "log it" in {
    recordCallback.onCompletion(null, new RuntimeException())
  }

  //At the moment purely for coverage purpose
  "If successfully sends record" should "log it" in {
    //noinspection ScalaDeprecation
    val metadata = new RecordMetadata(new TopicPartition("", 0), 0l, 1l) : @silent
    recordCallback.onCompletion(metadata, null)
  }

} 
开发者ID:firens,项目名称:article-streamer,代码行数:27,代码来源:RecordCallbackSpec.scala


示例17: KProducer

//设置package包名称以及导入依赖的类
package org.parsec

import java.util.Properties
import java.util.concurrent.Future

import com.sksamuel.avro4s.{FromRecord, RecordFormat, ToRecord}
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}

// Convenient Kafka producer using avro4s
class KProducer[K <: Product, V <: Product] {

  val kafkaProps = new Properties()
  kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "parsec.playground.landoop.com:49092")
  kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
  kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
  kafkaProps.put("schema.registry.url", "http://parsec.playground.landoop.com:48081")
  private lazy val producer  = new KafkaProducer[GenericRecord, GenericRecord](kafkaProps)

  def produce(topic: String, key: K, value: V, partition: Int = 0)(implicit toRecordKey: ToRecord[K], fromRecordKey: FromRecord[K], toRecord: ToRecord[V], fromRecord: FromRecord[V]): Future[RecordMetadata] = {
    val keyRec = RecordFormat[K].to(key)
    val valueRec = RecordFormat[V].to(value)
    val data: ProducerRecord[GenericRecord, GenericRecord] = new ProducerRecord(topic, partition, keyRec, valueRec)
    producer.send(data)
  }

} 
开发者ID:parsec-network,项目名称:parsec,代码行数:29,代码来源:KProducer.scala



注:本文中的org.apache.kafka.clients.producer.RecordMetadata类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala JsonTypeInfo类代码示例发布时间:2022-05-23
下一篇:
Scala ServerBinding类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap