• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ElasticClient类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中com.sksamuel.elastic4s.ElasticClient的典型用法代码示例。如果您正苦于以下问题:Scala ElasticClient类的具体用法?Scala ElasticClient怎么用?Scala ElasticClient使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ElasticClient类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: FundsService

//设置package包名称以及导入依赖的类
package models

import javax.inject.Inject

import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl._
import org.elasticsearch.common.settings.Settings
import play.api.Logger

import scala.collection.mutable
import scala.util.{Failure, Success, Try}

@javax.inject.Singleton
class FundsService @Inject() (
  configuration: play.api.Configuration
) {

  val settings = Settings.settingsBuilder()
    .put("http.enabled", false)
    .put("path.home", configuration.underlying.getString("elastic.data.path"))
  val client = ElasticClient.local(settings.build)

  Try(insertBlocking(Funds(mutable.Buffer.empty))) match {
    case Success(_) => ()

    case Failure(cause) =>
      Logger.error(s"Could not create index - " + cause)
  }

  def find(filter: String, offset: Int, pageSize: Int) = {
    client.execute { search in "gov" / "funds" query filter from offset size pageSize }
  }

  def findBlocking(filter: String, offset: Int, pageSize: Int) = {
    client.execute { search in "gov" / "funds" query filter from offset size pageSize }.await
  }

  def insert(funds: Funds) = {
    client.execute { index into "gov" / "funds" fields funds.data }
  }

  def insertBlocking(funds: Funds) = {
    client.execute { index into "gov" / "funds" fields funds.data }.await
  }

  def deleteAll() = {
    client.execute { deleteIndex("gov") }
  }
}

case class Funds(data: mutable.Buffer[(String, String)]) 
开发者ID:radusw,项目名称:eu_funds,代码行数:52,代码来源:FundsService.scala


示例2: MsgToElastic

//设置package包名称以及导入依赖的类
package Birdcage


import java.util.Date
import com.sksamuel.elastic4s.ElasticDsl._
import akka.actor.Actor
import com.sksamuel.elastic4s.{ElasticClient, IndexAndType, IndexAndTypes, IndexResult}
import com.typesafe.scalalogging.LazyLogging
import org.eclipse.paho.client.mqttv3.MqttMessage
import scala.concurrent.ExecutionContext.Implicits.global
import concurrent.Future
import scala.util.{Failure, Success}

object MsgToElastic {
  case class MsgMqtt(topic: String, message: MqttMessage, esClient: ElasticClient, indexTypeEs: Tuple2[String, String])
}

class MsgToElastic extends Actor with LazyLogging {

  import MsgToElastic._

  def receive = {
    case MsgMqtt(topic, message, esClient, indexTypeEs) => sentToElastic(topic, message, esClient, indexTypeEs)
    case _ => logger.debug("MsgActor received unsupported message")
  }

  def sentToElastic(topic: String, message: MqttMessage, esClient: ElasticClient, indexTypeEs: Tuple2[String, String]) = {

    val indexType = new IndexAndType(indexTypeEs._1.toLowerCase, indexTypeEs._2.toLowerCase())
    val messageVal = s"%s".format(message)
    val timestamp = new Date().toString

    val resp: Future[IndexResult] = {
      esClient.execute {
        index into IndexAndTypes.apply(indexType)  fields(
          "postcode"   -> topic.takeWhile(_ != '/').toLowerCase(),
          "huisnummer" -> topic.dropWhile(_ != '/').drop(1).takeWhile(_ != '/'),
          "timestamp"  -> timestamp,
          "subject"    -> topic.reverse.takeWhile(_ != '/').reverse,
          "value"      -> messageVal
          )
      }
    }
    resp.onComplete
    {
      case Success(res) => {
        val bufferString = "Msg to Es : %s, %s, %s\n %s\n".format(timestamp, topic, messageVal, resp)
        logger.info(bufferString)
      }
      case Failure(e) => logger.info("An error has occured while sending to Elastic")
    }
  }
} 
开发者ID:TreeWIFI,项目名称:aviary,代码行数:54,代码来源:MsgToElastic.scala


示例3: CreateIndex

//设置package包名称以及导入依赖的类
package com.datamountaineer.streamreactor.connect.elastic.indexname

import com.datamountaineer.connector.config.Config
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl._
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse

import scala.concurrent.Future


object CreateIndex {
  def apply(config: Config)(implicit client: ElasticClient): Future[CreateIndexResponse] = {
    require(config.isAutoCreate, s"Auto-creating indexes hasn't been enabled for target:${config.getTarget}")

    val indexName = getIndexName(config)
    client.execute {
      Option(config.getDocType) match {
        case None => create index indexName
        case Some(documentType) => create index indexName mappings (mapping(documentType))
      }
    }
  }

  def getIndexName(config: Config): String = {
    Option(config.getIndexSuffix).fold(config.getTarget) { indexNameSuffix =>
      s"${config.getTarget}${CustomIndexName.parseIndexName(indexNameSuffix)}"
    }
  }
} 
开发者ID:datamountaineer,项目名称:stream-reactor,代码行数:30,代码来源:CreateIndex.scala


示例4: SpecificationConstants

//设置package包名称以及导入依赖的类
package com.clemble.query

import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.mappings.FieldType.{IntegerType, StringType}
import com.sksamuel.elastic4s.{ElasticsearchClientUri, ElasticClient}
import reactivemongo.api.MongoDriver

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

case object SpecificationConstants {

  val db = Await.result(MongoDriver().connection(List("localhost:27017")).database("test"), 1 minute)

  val client: ElasticClient = {
    val uri = ElasticsearchClientUri("localhost", 9300)
    val client = ElasticClient.transport(uri)
    initClient(client)
    client
  }

  private def initClient(client: ElasticClient) = {
    val needToRemove = client.execute(indexExists("test")).map(_.isExists()).await
    if (needToRemove) {
      require(client.execute(deleteIndex("test")).await().isAcknowledged())
    }

    val nameMapping = mapping("employee").fields(
      field("name", StringType).index("not_analyzed"),
      field("salary", IntegerType)
    )
    val createCommand = create index "test" mappings (nameMapping)

    val createResponse = client.execute(createCommand).await
    createResponse.isAcknowledged()
  }

} 
开发者ID:clemble,项目名称:scala-query-dsl,代码行数:40,代码来源:SpecificationConstants.scala


示例5:

//设置package包名称以及导入依赖的类
package com.puroguramingu.util

import java.io.File
import java.util.UUID

import com.sksamuel.elastic4s.ElasticClient
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.node.NodeBuilder
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite}

trait ElasticSearch extends BeforeAndAfterEach with BeforeAndAfterAll {
  this: Suite =>

  private val tempFile = File.createTempFile("elasticsearchtests", "tmp")
  private val homeDir = new File(tempFile.getParent + "/" + UUID.randomUUID().toString)

  homeDir.mkdir()
  homeDir.deleteOnExit()
  tempFile.deleteOnExit()

  protected val settings = Settings.settingsBuilder()
    .put("node.http.enabled", true)
    .put("http.enabled", true)
    .put("index.store.type", "niofs")
    .put("path.home", homeDir.getAbsolutePath)
    .put("index.number_of_shards", 1)
    .put("index.number_of_replicas", 0)
    .put("es.logger.level", "DEBUG")
    .put("http.port", 9500)
    .put("transport.tcp.port", 9400)
    .build()

  val node = NodeBuilder.nodeBuilder().settings(settings).local(false).build().start()

  val client = ElasticClient.fromNode(node)

} 
开发者ID:mdymczyk,项目名称:spark-es-writer,代码行数:38,代码来源:ElasticSearch.scala


示例6: ApplicationModule

//设置package包名称以及导入依赖的类
package com.pharmpress.dmdbrowser

import com.google.inject.AbstractModule
import com.pharmpress.elasticsearch.{Elastic4sServiceImpl, IElasticSearch}
import com.sksamuel.elastic4s.{ElasticClient, ElasticsearchClientUri}
import org.elasticsearch.common.settings.{ImmutableSettings, Settings}
import play.api.{Configuration, Environment, Logger}


class ApplicationModule(
                         environment: Environment,
                         configuration: Configuration
                       ) extends AbstractModule {

  def configure: Unit = {

    val conf = configuration.underlying
    val rpsConfigUri = conf.getString("es.rps.client.uri")
    val rpsConfigCluster = conf.getString("es.rps.cluster.name")

    bind(classOf[IElasticSearch]) toInstance new Elastic4sServiceImpl(() => {
      Logger.info(s"Connecting to Elasticsearch at $rpsConfigUri with cluster $rpsConfigCluster")
      ElasticClient.remote(
        ImmutableSettings.settingsBuilder()
          .classLoader(classOf[Settings].getClassLoader)
          .put("cluster.name", rpsConfigCluster)
          .build(),
        ElasticsearchClientUri(rpsConfigUri)
      )
    })
  }
} 
开发者ID:pharmpress,项目名称:DmdBrowserScalaJs,代码行数:33,代码来源:ApplicationModule.scala


示例7: LogHandler

//设置package包名称以及导入依赖的类
package actors.stateless

import akka.actor.Actor
import akka.event.Logging._
import com.sksamuel.elastic4s.{ElasticClient, ElasticsearchClientUri}


class LogHandler extends Actor{
  val uri = ElasticsearchClientUri("elasticsearch://127.0.0.1:9300")
  lazy val esClient = ElasticClient.transport(uri)

  def receive = {
    case InitializeLogger(_) =>
      sender() ! LoggerInitialized

    case [email protected] Error(cause, logSource, logClass, message) =>
      println(s"DEBUG: [$logSource - $logClass] $message ${e.mdc}")
      persistElasticsearch(e)

    case [email protected] Warning(logSource, logClass, message) =>
      println(s"DEBUG: [$logSource - $logClass] $message ${w.mdc}")
      persistElasticsearch(w)

    case [email protected] Info(logSource, logClass, message) =>
      println(s"DEBUG: [$logSource - $logClass] $message ${i.mdc}")
      persistElasticsearch(i)

    case [email protected] Debug(logSource, logClass, message) =>
      println(s"DEBUG: [$logSource - $logClass] $message ${d.mdc}")
      persistElasticsearch(d)
  }

  def persistElasticsearch(logEvent: LogEvent): Unit = {
    import com.sksamuel.elastic4s.ElasticDsl._
    import com.sksamuel.elastic4s.jackson.ElasticJackson
    import ElasticJackson.Implicits._

    import scala.concurrent.ExecutionContext.Implicits.global
    esClient.execute {
      index into "actor" / "Log" source logEvent id logEvent.timestamp
    }.map { t =>
      println(s"Persisted to Elastic: $t")
    }.onFailure{
      case t : Throwable => println(t, "Persisted to Elastic Exception: $t")
    }
  }
} 
开发者ID:GoranSchumacher,项目名称:massive-actors,代码行数:48,代码来源:LogHandler.scala


示例8: ESPersistence

//设置package包名称以及导入依赖的类
package com.stratio.ioft.persistence

import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl._
import com.stratio.ioft.settings.IOFTConfig
import org.apache.spark.streaming.dstream.DStream
import org.elasticsearch.common.settings.Settings

import scala.collection.JavaConversions._

object ESPersistence extends IOFTConfig {

  val settings = Settings.settingsBuilder()

  esConfig.entrySet().map {
    entry =>
      settings.put(entry.getKey, esConfig.getAnyRef(entry.getKey))
  }

  val client = ElasticClient.local(settings.build)

  def persist(dStream: DStream[(String, String)]) ={
    dStream.foreachRDD{ rdd =>
      client.execute{ index into "ioft" / "drone" fields rdd.collect()}
    }
  }

} 
开发者ID:pfcoperez,项目名称:sparkstream_ioft,代码行数:29,代码来源:ESPersistence.scala


示例9: KafkaToRestElasticsearchAggregatorFLow

//设置package包名称以及导入依赖的类
package com.pragmasoft.eventaggregator.streams

import akka.NotUsed
import akka.actor.ActorSystem
import com.pragmasoft.eventaggregator.ActorSystemProvider
import com.pragmasoft.eventaggregator.GenericRecordEventJsonConverter.EventHeaderDescriptor
import com.sksamuel.elastic4s.ElasticClient
import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient

class KafkaToRestElasticsearchAggregatorFLow(
    override val kafkaConfig: KafkaPublisherConfig,
    override val elasticSearchIndexPrefix: String,
    override val elasticSearchConnectionUrl: String,
    override implicit val actorSystem: ActorSystem,
    override val schemaRegistry: SchemaRegistryClient,
    override val headerDescriptor: EventHeaderDescriptor,
    override val esWriterActorDispatcher: String
  )
  extends EventAggregatorFlow[NotUsed]
  with KafkaSourceProvider
  with RestElasticsearchEventSinkProvider
  with ActorSystemProvider
  with DayPartitionedElasticSearchIndexNameProvider
  with LazyLogging {
}


class KafkaToNativeElasticsearchAggregatorFLow(
     override val kafkaConfig: KafkaPublisherConfig,
     override val elasticSearchIndexPrefix: String,
     override implicit val actorSystem: ActorSystem,
     override val schemaRegistry: SchemaRegistryClient,
     override val headerDescriptor: EventHeaderDescriptor,
     override val elasticSearchClient: ElasticClient
   )
  extends EventAggregatorFlow[NotUsed]
    with KafkaSourceProvider
    with ElasticsearchEventSinkProvider
    with ActorSystemProvider
    with DayPartitionedElasticSearchIndexNameProvider
    with LazyLogging {
} 
开发者ID:galarragas,项目名称:event-aggregator,代码行数:44,代码来源:KafkaToElasticsearchAggregatorFLow.scala


示例10: esBefore

//设置package包名称以及导入依赖的类
package com.nathankleyn.specs2.elasticsearch

import java.net.ServerSocket

import com.sksamuel.elastic4s.ElasticClient
import org.elasticsearch.client.Client
import org.specs2.specification.BeforeAfterAll

trait ElasticsearchSpec extends BeforeAfterAll {
  val esPort: Int = findFreePort
  val esJavaPort: Int = findFreePort

  private lazy val esNode = EmbeddedElasticsearch(esPort, esJavaPort)
  lazy val esClusterName = esNode.clusterName
  lazy val esClient: Client = esNode.client
  lazy val e4sClient: ElasticClient = ElasticClient.fromClient(esClient)

  // We have seperate methods here from beforeAll/afterAll because if you want to mixin multiple
  // Before/AfterAll traits you are going to need to call these directly.
  def esBefore(): Unit = esNode.start()
  def esAfter(): Unit = esNode.stop()

  def beforeAll(): Unit = esBefore()
  def afterAll(): Unit = esAfter()

  
  private def findFreePort: Int = {
    val ss = new ServerSocket(0)
    val port = ss.getLocalPort
    ss.close()
    port
  }
} 
开发者ID:nathankleyn,项目名称:specs2-elasticsearch,代码行数:34,代码来源:ElasticsearchSpec.scala


示例11: ElasticsearchLog

//设置package包名称以及导入依赖的类
package com.rxthings.furnace.logs

import com.rxthings.furnace.Events.{Reading, ReadingFailure}
import com.rxthings.furnace.FurnaceLog
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.jackson.ElasticJackson.Implicits._
import com.typesafe.scalalogging.LazyLogging



class ElasticsearchLog extends FurnaceLog with LazyLogging {
    val elastic = ElasticClient.local

    def receive: Receive = {
        case reading: Reading => elastic.execute {
            index into "temperature" / "readings" source reading
        }
        case failure: ReadingFailure => elastic.execute {
            index into "temperature" / "failures" source failure
        }
    }
} 
开发者ID:jw3,项目名称:proto-furnace,代码行数:24,代码来源:ElasticsearchLog.scala



注:本文中的com.sksamuel.elastic4s.ElasticClient类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala AuthElement类代码示例发布时间:2022-05-23
下一篇:
Scala HttpRequest类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap