• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python dicttoolz.merge函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中toolz.dicttoolz.merge函数的典型用法代码示例。如果您正苦于以下问题:Python merge函数的具体用法?Python merge怎么用?Python merge使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了merge函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_added

 def test_added(self):
     """
     total desired, pending and actual are added to cloud metrics
     """
     td = 10
     ta = 20
     tp = 3
     tt = 7
     tg = 13
     m = {'collectionTime': 100000, 'ttlInSeconds': 5 * 24 * 60 * 60}
     md = merge(m, {'metricValue': td, 'metricName': 'ord.desired'})
     ma = merge(m, {'metricValue': ta, 'metricName': 'ord.actual'})
     mp = merge(m, {'metricValue': tp, 'metricName': 'ord.pending'})
     mt = merge(m, {'metricValue': tt, 'metricName': 'ord.tenants'})
     mg = merge(m, {'metricValue': tg, 'metricName': 'ord.groups'})
     req_data = [md, ma, mp, mt, mg]
     log = object()
     seq = [
         (Func(time.time), const(100)),
         (service_request(
             ServiceType.CLOUD_METRICS_INGEST, "POST", "ingest",
             data=req_data, log=log).intent, noop)
     ]
     eff = add_to_cloud_metrics(
         m['ttlInSeconds'], 'ord', td, ta, tp, tt, tg, log=log)
     self.assertIsNone(perform_sequence(seq, eff))
开发者ID:pratikmallya,项目名称:otter,代码行数:26,代码来源:test_metrics.py


示例2: test_factory

def test_factory():
    assert merge(defaultdict(int, {1: 2}), {2: 3}) == {1: 2, 2: 3}
    assert (merge(defaultdict(int, {1: 2}), {2: 3},
                  factory=lambda: defaultdict(int)) ==
            defaultdict(int, {1: 2, 2: 3}))
    assert not (merge(defaultdict(int, {1: 2}), {2: 3},
                      factory=lambda: defaultdict(int)) == {1: 2, 2: 3})
    assert raises(TypeError, lambda: merge({1: 2}, {2: 3}, factoryy=dict))
开发者ID:cpcloud,项目名称:toolz,代码行数:8,代码来源:test_dicttoolz.py


示例3: test_factory

 def test_factory(self):
     D, kw = self.D, self.kw
     assert merge(defaultdict(int, D({1: 2})), D({2: 3})) == {1: 2, 2: 3}
     assert (merge(defaultdict(int, D({1: 2})), D({2: 3}),
                   factory=lambda: defaultdict(int)) ==
             defaultdict(int, D({1: 2, 2: 3})))
     assert not (merge(defaultdict(int, D({1: 2})), D({2: 3}),
                       factory=lambda: defaultdict(int)) == {1: 2, 2: 3})
     assert raises(TypeError, lambda: merge(D({1: 2}), D({2: 3}), factoryy=dict))
开发者ID:ZachPhillipsGary,项目名称:CS200-NLP-ANNsProject,代码行数:9,代码来源:test_dicttoolz.py


示例4: get_train

def get_train(train_id):
    conn = getattr(g, 'db_conn')
    query = r.table('train_movements').filter(r.row['train_id'] == train_id).order_by(r.desc('actual_timestamp'))
    mvs = list(query.run(conn))
    info = get_train_info(mvs[0])
    train = merge(info, {'movements': mvs})
    return json.dumps(train, default=json_formats.date_handler)
开发者ID:FLamparski,项目名称:RailDelay,代码行数:7,代码来源:flask_main.py


示例5: rolling_fit_opt_weights

def rolling_fit_opt_weights(df, opt_weights_func, look_ahead_per):
    """applies opt_weights_func to rolling window on pandas df"""
    num_rows = df.shape[0]
    p = pipe(xrange(num_rows),
             filter(lambda x: x + look_ahead_per < num_rows),
             map(lambda x: {df.index[x]: opt_weights_func(df.iloc[x:x+look_ahead_per+1])}))
    return pd.DataFrame(merge(p)).T
开发者ID:rhouck,项目名称:nn_port,代码行数:7,代码来源:opt_weights.py


示例6: __init__

    def __init__(self, name, hist_return=None, industry_weight=None, property_dict=defaultdict(str), **kwargs):
        self.name = name
        self.property = merge(_REQUIRED_BENCHMARK_PROPERTY, property_dict)
        self.production_data_format = kwargs.get('production_data_format', OutputDataFormat.MULTI_INDEX_DF)
        self.hist_return = hist_return
        self.industry_weight = industry_weight  # 对标指数的行业成分比例

        self._validate_data_format()
        self._validate_date_format()
开发者ID:digideskio,项目名称:alphaware,代码行数:9,代码来源:benchmark.py


示例7: add_to_cloud_metrics

def add_to_cloud_metrics(ttl, region, group_metrics, num_tenants, config,
                         log=None, _print=False):
    """
    Add total number of desired, actual and pending servers of a region
    to Cloud metrics.

    :param str region: which region's metric is collected
    :param group_metrics: List of :obj:`GroupMetric`
    :param int num_tenants: total number of tenants
    :param dict config: Config json dict containing convergence tenants info
    :param log: Optional logger
    :param bool _print: Should it print activity on stdout? Useful when running
        as a script

    :return: `Effect` with None
    """
    epoch = yield Effect(Func(time.time))
    metric_part = {'collectionTime': int(epoch * 1000),
                   'ttlInSeconds': ttl}

    tenanted_metrics, total = calc_total(group_metrics)
    if log is not None:
        log.msg(
            'total desired: {td}, total_actual: {ta}, total pending: {tp}',
            td=total.desired, ta=total.actual, tp=total.pending)
    if _print:
        print('total desired: {}, total actual: {}, total pending: {}'.format(
            total.desired, total.actual, total.pending))

    metrics = [('desired', total.desired), ('actual', total.actual),
               ('pending', total.pending), ('tenants', num_tenants),
               ('groups', len(group_metrics))]
    for tenant_id, metric in sorted(tenanted_metrics.items()):
        metrics.append(("{}.desired".format(tenant_id), metric.desired))
        metrics.append(("{}.actual".format(tenant_id), metric.actual))
        metrics.append(("{}.pending".format(tenant_id), metric.pending))

    # convergence tenants desired and actual
    conv_tenants = keyfilter(
        partial(tenant_is_enabled,
                get_config_value=lambda k: get_in([k], config)),
        tenanted_metrics)
    conv_desired = sum(m.desired for m in conv_tenants.itervalues())
    conv_actual = sum(m.actual for m in conv_tenants.itervalues())
    metrics.extend(
        [("conv_desired", conv_desired), ("conv_actual", conv_actual),
         ("conv_divergence", conv_desired - conv_actual)])

    data = [merge(metric_part,
                  {'metricValue': value,
                   'metricName': '{}.{}'.format(region, metric)})
            for metric, value in metrics]
    yield service_request(ServiceType.CLOUD_METRICS_INGEST,
                          'POST', 'ingest', data=data, log=log)
开发者ID:dragorosson,项目名称:otter,代码行数:54,代码来源:metrics.py


示例8: get_step_limits_from_conf

def get_step_limits_from_conf(limit_conf):
    """
    Get step limits along with defaults for steps not in limit_conf

    :param dict limit_conf: step name -> limit mapping

    :return: `dict` of step class -> limit
    """
    step_limits = {
        step_conf_to_class[step_conf]: limit
        for step_conf, limit in limit_conf.items()}
    return merge(_DEFAULT_STEP_LIMITS, step_limits)
开发者ID:dragorosson,项目名称:otter,代码行数:12,代码来源:transforming.py


示例9: clean_movement_message

def clean_movement_message(msg, msg_type, conn):
    extras = {'type': msg_type}
    body = msg['body']

    for key in body.keys():
        if key.endswith('_stanox'):
            logger.debug('Train {}: Lookup stanox {} for field {}'.format(body['train_id'], body[key], key))
            extras = merge(extras, get_geo(body[key], key[:-len('_stanox')], conn))

        if key.endswith('_timestamp'):
            try:
                logger.debug('Converting timestamp for field {}'.format(key))
                intval = int(body[key])
                extras[key] = r.epoch_time(intval / 1000.0)
            except:
                pass

        if body[key] == 'true' or body[key] == 'false':
            extras[key] = bool(body[key] == 'true')

    return merge(body, extras)
开发者ID:FLamparski,项目名称:RailDelay,代码行数:21,代码来源:train_movements.py


示例10: _renderDirectory

 def _renderDirectory(self, ruleHits, ruleStats, directory, filename):
     # Generate output HTML for each rule
     for rule, hits in ruleHits.items():
         # Render hits for individual rule
         outfilePathJSON = os.path.join(directory, rule.machine_name + ".json")
         if len(hits) > 0:  # Render hits
             # Generate JSON API
             jsonAPI = {
                 "timestamp": self.timestamp,
                 "downloadTimestamp": self.downloadTimestamp,
                 "rule": rule.meta_dict,
                 "hits": [valfilter(bool, {"msgstr": entry.msgstr, # valfilter: remove empty values for smaller JSON
                                           "msgid": entry.msgid,
                                           "tcomment": entry.tcomment,
                                           "hit": hit,
                                           "origImages": origImages,
                                           "translatedImages": translatedImages,
                                           "crowdinLink": "{0}#q={1}".format(self.translationURLs[filename], genCrowdinSearchString(entry))
                                           })
                          for entry, hit, filename, origImages, translatedImages in hits]
             }
             writeJSONToFile(outfilePathJSON, jsonAPI)
         else:  # Remove file (redirects to 404 file) if there are no exportHitsAsJSON
             if os.path.isfile(outfilePathJSON):
                 os.remove(outfilePathJSON)
     # Render file index page (no filelist)
     ruleInfos = [merge(rule.meta_dict, {"num_hits": ruleStats[rule]})
                  for rule in self.rules if ruleStats[rule] > 0]
     ruleInfos.sort(key=lambda o: -o["severity"])  # Invert sort order
     js = {
         "pageTimestamp": self.timestamp,
         "downloadTimestamp": self.downloadTimestamp,
         "stats": ruleInfos,
         "files": [merge(self.statsByFile[filename], {"filename": filename})
                   for filename in self.files
                   if self.statsByFile[filename]["notices"] > 0]
     }
     writeJSONToFile(os.path.join(directory, "index.json"), js)
开发者ID:ulikoehler,项目名称:KATranslationCheck,代码行数:38,代码来源:check.py


示例11: prepare_server_launch_config

def prepare_server_launch_config(group_id, server_config, lb_descriptions):
    """
    Prepare a server config (the server part of the Group's launch config)
    with any necessary dynamic data.

    :param str group_id: The group ID
    :param PMap server_config: The server part of the Group's launch config,
        as per :obj:`otter.json_schema.group_schemas.server` except as the
        value of a one-element PMap with key "server".
    :param iterable lb_descriptions: iterable of
        :class:`ILBDescription` providers
    """
    updated_metadata = merge(
        get_in(('server', 'metadata'), server_config, {}),
        generate_metadata(group_id, lb_descriptions))

    return set_in(server_config, ('server', 'metadata'), updated_metadata)
开发者ID:pratikmallya,项目名称:otter,代码行数:17,代码来源:composition.py


示例12: mark_deleted_servers

def mark_deleted_servers(old, new):
    """
    Given dictionaries containing old and new servers, return a list of all
    servers, with the deleted ones annotated with a status of DELETED.

    :param list old: List of old servers
    :param list new: List of latest servers
    :return: List of updated servers
    """

    def sdict(servers):
        return {s['id']: s for s in servers}

    old = sdict(old)
    new = sdict(new)
    deleted_ids = set(old.keys()) - set(new.keys())
    for sid in deleted_ids:
        old[sid] = assoc(old[sid], "status", "DELETED")
    return merge(old, new).values()
开发者ID:rackerlabs,项目名称:otter,代码行数:19,代码来源:gathering.py


示例13: computeRuleHitsForFileSet

    def computeRuleHitsForFileSet(self, poFiles):
        """
        For each file in the given filename -> PO object dictionary,
        compute the Rule -> Hits dictonary.

        Stores the information in the current instance.
        Does not return anything
        """
        # Compute dict with sorted & prettified filenames
        self.files = sorted(poFiles.keys())
        # Add all futures to the executor
        futures = list(itertools.chain(*(self.computeRuleHits(po, filename)
                                         for filename, po in poFiles.items())))
        # Process the results in first-received order. Also keep track of rule performance
        self.fileRuleHits = collections.defaultdict(dict)
        n_finished = 0
        # Intermediate result storage
        raw_results = collections.defaultdict(dict) # filename -> {rule: result}
        for future in concurrent.futures.as_completed(futures):
            # Extract result
            filename, rule, result = future.result()
            self.fileRuleHits[filename][rule] = result
            # Track progress
            n_finished += 1
            if n_finished % 1000 == 0:
                percent_finished = n_finished * 100. / len(futures)
                print("Rule computation finished {0:.2f} %".format(percent_finished))

        # Compute total stats by file
        self.statsByFile = {
            filename: merge(self.ruleHitsToSeverityCountMap(ruleHits), {
                            "translation_url": self.translationURLs[filename]})
            for filename, ruleHits in self.fileRuleHits.items()
        }
        # Compute map filename -> {rule: numHits for rule}
        self.statsByFileAndRule = {
            filename: valmap(len, ruleHits)
            for filename, ruleHits in self.fileRuleHits.items()
        }
        # Compute map rule -> numHits for rule
        self.totalStatsByRule = merge_with(sum, *(self.statsByFileAndRule.values()))
开发者ID:ulikoehler,项目名称:KATranslationCheck,代码行数:41,代码来源:check.py


示例14: unchanged_divergent_groups

def unchanged_divergent_groups(clock, current, timeout, group_metrics):
    """
    Return list of GroupMetrics that have been divergent and unchanged for
    timeout seconds

    :param IReactorTime clock: Twisted time used to track
    :param dict current: Currently tracked divergent groups
    :param float timeout: Timeout in seconds
    :param list group_metrics: List of group metrics

    :return: (updated current, List of (group, divergent_time) tuples)
    """
    converged, diverged = partition_bool(
        lambda gm: gm.actual + gm.pending == gm.desired, group_metrics)
    # stop tracking all converged and deleted groups
    deleted = set(current.keys()) - metrics_set(group_metrics)
    updated = current.copy()
    for g in metrics_set(converged) | deleted:
        updated.pop(g, None)
    # Start tracking divergent groups depending on whether they've changed
    now = clock.seconds()
    to_log, new = [], {}
    for gm in diverged:
        pair = (gm.tenant_id, gm.group_id)
        if pair in updated:
            last_time, values = updated[pair]
            if values != hash((gm.desired, gm.actual, gm.pending)):
                del updated[pair]
                continue
            time_diff = now - last_time
            if time_diff > timeout and time_diff % timeout <= 60:
                # log on intervals of timeout. For example, if timeout is 1 hr
                # then log every hour it remains diverged
                to_log.append((gm, time_diff))
        else:
            new[pair] = now, hash((gm.desired, gm.actual, gm.pending))
    return merge(updated, new), to_log
开发者ID:dragorosson,项目名称:otter,代码行数:37,代码来源:metrics.py


示例15: dict

        str : None,
        object : None,
        Union : lambda xs: xs[0],
        List : lambda x : [examples[x]]*3, 
        } 
simple = lambda t: dict(name=dict(type=t,value=None,example=examples.get(t)))
primitives = [int, bool, str, float]
primdict = dict(zip(primitives, [simple]*len(primitives)))
enum = compose(simple, lambda x: x.__name__)
def handle_union(xs):
    xs = list(map(lambda x: x['name']['type'], xs))
    return dict(name=dict(choices=xs, example=xs[0],value=None))
def handle_list(t):
    t = next(t)
    return dict(name=dict(type=t,value=None,example=examples[List](t))),
tfuncs= merge(primdict, {
        object : enum,
        NamedTuple : enum,
        Optional :  lambda x: merge(simple(x), {'optional' : True}),
        List : handle_list,
        #Union : lambda xs: dict(name=dict(choices=xs, example=xs[0],value=None), 
        Union : handle_union
        })
#{n : t for n,t in res =  traverse_type(TrimOpts, tfuncs)
from functools import reduce 
from itertools import starmap
#res = reduce(merge, map(lambda x: traverse_type(x, tfuncs), TrimOpts._field_types), {})
print(TrimOpts.__dict__)
res =  {k : traverse_type(t, tfuncs) for k,t in TrimOpts._field_types.items()}
print(res)
开发者ID:averagehat,项目名称:mypy-extras,代码行数:30,代码来源:typeyaml.py


示例16: _server

def _server(group, state):
    server = sample_servers()[0]
    return merge(server, {"status": state, "metadata": {"rax:auto_scaling_group_id": group}})
开发者ID:dragorosson,项目名称:otter,代码行数:3,代码来源:test_metrics.py


示例17: test_added

    def test_added(self):
        """
        total desired, pending and actual are added to cloud metrics
        """
        metrics = [
            GroupMetrics("t1", "g1", 3, 2, 0),
            GroupMetrics("t2", "g1", 4, 4, 1),
            GroupMetrics("t2", "g", 100, 20, 0),
            GroupMetrics("t3", "g3", 5, 3, 0),
        ]
        config = {"non-convergence-tenants": ["t1"]}
        m = {"collectionTime": 100000, "ttlInSeconds": 5 * 24 * 60 * 60}
        md = merge(m, {"metricValue": 112, "metricName": "ord.desired"})
        ma = merge(m, {"metricValue": 29, "metricName": "ord.actual"})
        mp = merge(m, {"metricValue": 1, "metricName": "ord.pending"})
        mt = merge(m, {"metricValue": 3, "metricName": "ord.tenants"})
        mg = merge(m, {"metricValue": 4, "metricName": "ord.groups"})
        mt1d = merge(m, {"metricValue": 3, "metricName": "ord.t1.desired"})
        mt1a = merge(m, {"metricValue": 2, "metricName": "ord.t1.actual"})
        mt1p = merge(m, {"metricValue": 0, "metricName": "ord.t1.pending"})
        mt2d = merge(m, {"metricValue": 104, "metricName": "ord.t2.desired"})
        mt2a = merge(m, {"metricValue": 24, "metricName": "ord.t2.actual"})
        mt2p = merge(m, {"metricValue": 1, "metricName": "ord.t2.pending"})
        mt3d = merge(m, {"metricValue": 5, "metricName": "ord.t3.desired"})
        mt3a = merge(m, {"metricValue": 3, "metricName": "ord.t3.actual"})
        mt3p = merge(m, {"metricValue": 0, "metricName": "ord.t3.pending"})
        cd = merge(m, {"metricValue": 109, "metricName": "ord.conv_desired"})
        ca = merge(m, {"metricValue": 27, "metricName": "ord.conv_actual"})
        cdiv = merge(m, {"metricValue": 82, "metricName": "ord.conv_divergence"})

        req_data = [md, ma, mp, mt, mg, mt1d, mt1a, mt1p, mt2d, mt2a, mt2p, mt3d, mt3a, mt3p, cd, ca, cdiv]
        log = mock_log()
        seq = [
            (Func(time.time), const(100)),
            (service_request(ServiceType.CLOUD_METRICS_INGEST, "POST", "ingest", data=req_data, log=log).intent, noop),
        ]
        eff = add_to_cloud_metrics(m["ttlInSeconds"], "ord", metrics, 3, config, log)  # number of tenants
        self.assertIsNone(perform_sequence(seq, eff))
        log.msg.assert_called_once_with(
            "total desired: {td}, total_actual: {ta}, total pending: {tp}", td=112, ta=29, tp=1
        )
开发者ID:dragorosson,项目名称:otter,代码行数:41,代码来源:test_metrics.py


示例18: searchNoType

 def searchNoType(self, request, params):
     return SearchResultResource(
         store=self.store,
         params=merge(self.params, params, {'searchType': None}))
开发者ID:featherlightly,项目名称:fusion-index,代码行数:4,代码来源:resource.py


示例19: got_additional_headers

 def got_additional_headers(additional_headers):
     return request_func(*args,
                         headers=merge(headers, additional_headers),
                         **kwargs)
开发者ID:rackerlabs,项目名称:otter,代码行数:4,代码来源:pure_http.py


示例20: update_factor_property

def update_factor_property(factor_property):
    ret = merge(_REQUIRED_FACTOR_PROPERTY, factor_property)
    return ret
开发者ID:digideskio,项目名称:alphaware,代码行数:3,代码来源:factor_container.py



注:本文中的toolz.dicttoolz.merge函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python functoolz.curry函数代码示例发布时间:2022-05-27
下一篇:
Python curried.pipe函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap