• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python app.get_options函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中twitter.common.app.get_options函数的典型用法代码示例。如果您正苦于以下问题:Python get_options函数的具体用法?Python get_options怎么用?Python get_options使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_options函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_app_add_command_options

  def test_app_add_command_options(self):
    option_name = 'test_option_name'
    option = options.TwitterOption('--test', dest=option_name)

    @app.command_option(option)
    def test_command():
      pass

    assert not hasattr(app.get_options(), option_name)
    app.add_command_options(test_command)
    assert hasattr(app.get_options(), option_name)
开发者ID:BabyDuncan,项目名称:commons,代码行数:11,代码来源:test_app.py


示例2: __init__

  def __init__(self,
               servers=None,
               timeout_secs=None,
               watch=None,
               max_reconnects=MAX_RECONNECTS,
               logger=log.debug):
    """Create new ZooKeeper object.

    Blocks until ZK negotation completes, or the timeout expires. By default
    only tries to connect once.  Use a larger 'max_reconnects' if you want
    to be resilient to things such as DNS outages/changes.

    If watch is set to a function, it is called whenever the global
    zookeeper watch is dispatched using the same function signature, with the
    exception that this object is used in place of the zookeeper handle.
    """

    default_ensemble = self.DEFAULT_ENSEMBLE
    default_timeout = self.DEFAULT_TIMEOUT_SECONDS
    if WITH_APP:
      options = app.get_options()
      default_ensemble = options.zookeeper
      default_timeout = options.zookeeper_timeout
    self._servers = servers or default_ensemble
    self._timeout_secs = timeout_secs or default_timeout
    self._init_count = 0
    self._live = threading.Event()
    self._stopped = threading.Event()
    self._completions = Queue()
    self._zh = None
    self._watch = watch
    self._logger = logger
    self._max_reconnects = max_reconnects
    self.reconnect()
开发者ID:JoeEnnever,项目名称:commons,代码行数:34,代码来源:client.py


示例3: list_jobs

def list_jobs(cluster_and_role):
  """usage: list_jobs [--show-cron] cluster/role/env/job

  Shows all jobs that match the job-spec known by the scheduler.
  If --show-cron is specified, then also shows the registered cron schedule.
  """
  def show_job_simple(job):
    if options.show_cron_schedule:
      print(('{0}/{1.key.role}/{1.key.environment}/{1.key.name}' +
          '\t\'{1.cronSchedule}\'\t{1.cronCollisionPolicy}').format(cluster, job))
    else:
      print('{0}/{1.key.role}/{1.key.environment}/{1.key.name}'.format(cluster, job))

  def show_job_pretty(job):
    print("Job %s/%s/%s/%s:" %
        (cluster, job.key.role, job.key.environment, job.key.name))
    print('\tcron schedule: %s' % job.cronSchedule)
    print('\tcron policy:   %s' % job.cronCollisionPolicy)

  options = app.get_options()
  if options.show_cron_schedule and options.pretty:
    print_fn = show_job_pretty
  else:
    print_fn = show_job_simple
  # Take the cluster_and_role parameter, and split it into its two components.
  if cluster_and_role.count('/') != 1:
    die('list_jobs parameter must be in cluster/role format')
  (cluster,role) = cluster_and_role.split('/')
  api = make_client(cluster)
  resp = api.get_jobs(role)
  check_and_log_response(resp)
  for job in resp.result.getJobsResult.configs:
    print_fn(job)
开发者ID:sumanau7,项目名称:incubator-aurora,代码行数:33,代码来源:core.py


示例4: make_admin_client_with_options

def make_admin_client_with_options(cluster):
  options = app.get_options()

  return make_admin_client(
      cluster=cluster,
      verbose=getattr(options, 'verbosity', 'normal') == 'verbose',
      bypass_leader_redirect=options.bypass_leader_redirect)
开发者ID:bmhatfield,项目名称:aurora,代码行数:7,代码来源:admin.py


示例5: inspect

def inspect(job_spec, config_file):
  """usage: inspect cluster/role/env/job config

  Verifies that a job can be parsed from a configuration file, and displays
  the parsed configuration.
  """
  options = app.get_options()
  newcmd = ["job", "inspect", job_spec, config_file]
  if options.json:
    newcmd.append("--read-json")
  v1_deprecation_warning("inspect", newcmd)

  config = get_job_config(job_spec, config_file, options)
  if options.raw:
    print('Parsed job config: %s' % config.job())
    return

  job_thrift = config.job()
  job = config.raw()
  job_thrift = config.job()
  print('Job level information')
  print('  name:       %s' % job.name())
  print('  role:       %s' % job.role())
  print('  contact:    %s' % job.contact())
  print('  cluster:    %s' % job.cluster())
  print('  instances:  %s' % job.instances())
  if job.has_cron_schedule():
    print('  cron:')
    print('     schedule: %s' % job.cron_schedule())
    print('     policy:   %s' % job.cron_collision_policy())
  if job.has_constraints():
    print('  constraints:')
    for constraint, value in job.constraints().get().items():
      print('    %s: %s' % (constraint, value))
  print('  service:    %s' % job_thrift.taskConfig.isService)
  print('  production: %s' % bool(job.production().get()))
  print()

  task = job.task()
  print('Task level information')
  print('  name: %s' % task.name())
  if len(task.constraints().get()) > 0:
    print('  constraints:')
    for constraint in task.constraints():
      print('    %s' % (' < '.join(st.get() for st in constraint.order())))
  print()

  processes = task.processes()
  for process in processes:
    print('Process %s:' % process.name())
    if process.daemon().get():
      print('  daemon')
    if process.ephemeral().get():
      print('  ephemeral')
    if process.final().get():
      print('  final')
    print('  cmdline:')
    for line in process.cmdline().get().splitlines():
      print('    ' + line)
    print()
开发者ID:kevinburg,项目名称:incubator-aurora,代码行数:60,代码来源:core.py


示例6: disk_log_scheme

 def disk_log_scheme():
   """
     Get the current disk log scheme.
   """
   if LogOptions._DISK_LOG_SCHEME is None:
     LogOptions.set_disk_log_level(app.get_options().twitter_common_log_disk_log_level)
   return LogOptions._DISK_LOG_SCHEME
开发者ID:billwei,项目名称:commons,代码行数:7,代码来源:options.py


示例7: stdout_log_scheme

 def stdout_log_scheme():
   """
     Get the current stdout log scheme.
   """
   if LogOptions._STDOUT_LOG_SCHEME is None:
     LogOptions.set_stdout_log_level(app.get_options().twitter_common_log_stdout_log_level)
   return LogOptions._STDOUT_LOG_SCHEME
开发者ID:billwei,项目名称:commons,代码行数:7,代码来源:options.py


示例8: scheduler_snapshot

def scheduler_snapshot(cluster):
    """usage: scheduler_snapshot cluster

  Request that the scheduler perform a storage snapshot and block until complete.
  """
    options = app.get_options()
    check_and_log_response(AuroraClientAPI(CLUSTERS["cluster"], options.verbosity).snapshot())
开发者ID:Empia,项目名称:incubator-aurora,代码行数:7,代码来源:admin.py


示例9: scribe_port

 def scribe_port():
     """
   Get the current port used to connect to the scribe daemon.
 """
     if LogOptions._SCRIBE_PORT is None:
         LogOptions._SCRIBE_PORT = app.get_options().twitter_common_log_scribe_port
     return LogOptions._SCRIBE_PORT
开发者ID:EricCen,项目名称:commons,代码行数:7,代码来源:options.py


示例10: scribe_host

 def scribe_host():
     """
   Get the current host running the scribe daemon.
 """
     if LogOptions._SCRIBE_HOST is None:
         LogOptions._SCRIBE_HOST = app.get_options().twitter_common_log_scribe_host
     return LogOptions._SCRIBE_HOST
开发者ID:EricCen,项目名称:commons,代码行数:7,代码来源:options.py


示例11: scribe_buffer

 def scribe_buffer():
     """
   Get the current buffer setting for scribe logging.
 """
     if LogOptions._SCRIBE_BUFFER is None:
         LogOptions._SCRIBE_BUFFER = app.get_options().twitter_common_log_scribe_buffer
     return LogOptions._SCRIBE_BUFFER
开发者ID:EricCen,项目名称:commons,代码行数:7,代码来源:options.py


示例12: scribe_log_scheme

 def scribe_log_scheme():
     """
   Get the current scribe log scheme.
 """
     if LogOptions._SCRIBE_LOG_SCHEME is None:
         LogOptions.set_scribe_log_level(app.get_options().twitter_common_log_scribe_log_level)
     return LogOptions._SCRIBE_LOG_SCHEME
开发者ID:EricCen,项目名称:commons,代码行数:7,代码来源:options.py


示例13: scribe_category

 def scribe_category():
     """
   Get the current category used when logging to the scribe daemon.
 """
     if LogOptions._SCRIBE_CATEGORY is None:
         LogOptions._SCRIBE_CATEGORY = app.get_options().twitter_common_log_scribe_category
     return LogOptions._SCRIBE_CATEGORY
开发者ID:EricCen,项目名称:commons,代码行数:7,代码来源:options.py


示例14: end_maintenance_hosts

def end_maintenance_hosts(cluster):
  """usage: end_maintenance_hosts {--filename=filename | --hosts=hosts}
                                  cluster
  """
  options = app.get_options()
  HostMaintenance(CLUSTERS[cluster], options.verbosity).end_maintenance(
      parse_hosts(options.filename, options.hosts))
开发者ID:MustafaOrkunAcar,项目名称:incubator-aurora,代码行数:7,代码来源:maintenance.py


示例15: main

def main(args):
  options = app.get_options()
  if options.show_help:
    app.help()

  if options.show_version or options.just_version:
    print('Python NailGun client version %s' % VERSION)
    if options.just_version:
      sys.exit(0)

  # Assume ng.pex has been aliased to the command name
  command = re.compile('.pex$').sub('', os.path.basename(sys.argv[0]))
  args_index = 0

  # Otherwise the command name is the 1st arg
  if command == 'ng':
    if not args:
      app.help()

    command = args[0]
    args_index = 1

  ng = NailgunClient(host=options.ng_host, port=options.ng_port)
  try:
    result = ng(command, *args[args_index:], **os.environ)
    sys.exit(result)
  except ng.NailgunError as e:
    print('Problem executing command: %s' % e, file=sys.stderr)
    sys.exit(1)
开发者ID:alfss,项目名称:commons,代码行数:29,代码来源:ng.py


示例16: scheduler_stage_recovery

def scheduler_stage_recovery(cluster, backup_id):
    """usage: scheduler_stage_recovery cluster backup_id

  Stages a backup for recovery.
  """
    options = app.get_options()
    check_and_log_response(AuroraClientAPI(CLUSTERS[cluster], options.verbosity).stage_recovery(backup_id))
开发者ID:Empia,项目名称:incubator-aurora,代码行数:7,代码来源:admin.py


示例17: scheduler_unload_recovery

def scheduler_unload_recovery(cluster):
    """usage: scheduler_unload_recovery cluster

  Unloads a staged recovery.
  """
    options = app.get_options()
    check_and_log_response(AuroraClientAPI(CLUSTERS[cluster], options.verbosity).unload_recovery())
开发者ID:Empia,项目名称:incubator-aurora,代码行数:7,代码来源:admin.py


示例18: simple

 def simple():
     """
   Whether or not simple logging should be used.
 """
     if LogOptions._SIMPLE is None:
         LogOptions._SIMPLE = app.get_options().twitter_common_log_simple
     return LogOptions._SIMPLE
开发者ID:EricCen,项目名称:commons,代码行数:7,代码来源:options.py


示例19: stdout_log_level

 def stdout_log_level():
   """
     Get the current stdout_log_level (in logging units specified by logging module.)
   """
   if LogOptions._STDOUT_LOG_LEVEL is None:
     LogOptions.set_stdout_log_level(app.get_options().twitter_common_log_stdout_log_level)
   return LogOptions._STDOUT_LOG_LEVEL
开发者ID:billwei,项目名称:commons,代码行数:7,代码来源:options.py


示例20: handle_open

def handle_open(scheduler_url, role, env, job):
  url = synthesize_url(scheduler_url, role, env, job)
  if url:
    log.info('Job url: %s' % url)
    if app.get_options().open_browser:
      import webbrowser
      webbrowser.open_new_tab(url)
开发者ID:sumanau7,项目名称:incubator-aurora,代码行数:7,代码来源:base.py



注:本文中的twitter.common.app.get_options函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python app.main函数代码示例发布时间:2022-05-27
下一篇:
Python app.error函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap