• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python db.DatabaseHandler类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mediawords.db.DatabaseHandler的典型用法代码示例。如果您正苦于以下问题:Python DatabaseHandler类的具体用法?Python DatabaseHandler怎么用?Python DatabaseHandler使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了DatabaseHandler类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: store_content

def store_content(db: DatabaseHandler, download: dict, content: str) -> dict:
    """Store the content for the download."""
    # feed_error state indicates that the download was successful but that there was a problem
    # parsing the feed afterward.  so we want to keep the feed_error state even if we redownload
    # the content

    download = decode_object_from_bytes_if_needed(download)
    content = decode_object_from_bytes_if_needed(content)

    new_state = 'success' if download['state'] != 'feed_error' else 'feed_error'

    try:
        path = _get_store_for_writing().store_content(db, download['downloads_id'], content)
    except Exception as ex:
        raise McDBIDownloadsException("error while trying to store download %d: %s" % (download['downloads_id'], ex))

    if new_state == 'success':
        download['error_message'] = ''

    db.update_by_id(
        table='downloads',
        object_id=download['downloads_id'],
        update_hash={'state': new_state, 'path': path, 'error_message': download['error_message']},
    )

    download = db.find_by_id('downloads', download['downloads_id'])

    return download
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:28,代码来源:downloads.py


示例2: _create_child_download_for_story

def _create_child_download_for_story(db: DatabaseHandler, story: dict, parent_download: dict) -> None:
    """Create a pending download for the story's URL."""
    story = decode_object_from_bytes_if_needed(story)
    parent_download = decode_object_from_bytes_if_needed(parent_download)

    download = {
        'feeds_id': parent_download['feeds_id'],
        'stories_id': story['stories_id'],
        'parent': parent_download['downloads_id'],
        'url': story['url'],
        'host': get_url_host(story['url']),
        'type': 'content',
        'sequence': 1,
        'state': 'pending',
        'priority': parent_download['priority'],
        'extracted': False,
    }

    content_delay = db.query("""
        SELECT content_delay
        FROM media
        WHERE media_id = %(media_id)s
    """, {'media_id': story['media_id']}).flat()[0]
    if content_delay:
        # Delay download of content this many hours. his is useful for sources that are likely to significantly change
        # content in the hours after it is first published.
        now = int(datetime.datetime.now(datetime.timezone.utc).timestamp())
        download_at_timestamp = now + (content_delay * 60 * 60)
        download['download_time'] = get_sql_date_from_epoch(download_at_timestamp)

    db.create(table='downloads', insert_hash=download)
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:31,代码来源:stories.py


示例3: process_download_for_extractor

def process_download_for_extractor(db: DatabaseHandler,
                                   download: dict,
                                   extractor_args: PyExtractorArguments = PyExtractorArguments()) -> None:
    """Extract the download and create the resulting download_text entry. If there are no remaining downloads to be
    extracted for the story, call process_extracted_story() on the parent story."""

    download = decode_object_from_bytes_if_needed(download)

    stories_id = download['stories_id']

    log.debug("extract: {} {} {}".format(download['downloads_id'], stories_id, download['url']))

    extract_and_create_download_text(db=db, download=download, extractor_args=extractor_args)

    has_remaining_download = db.query("""
        SELECT downloads_id
        FROM downloads
        WHERE stories_id = %(stories_id)s
          AND extracted = 'f'
          AND type = 'content'
    """, {'stories_id': stories_id}).hash()

    # MC_REWRITE_TO_PYTHON: Perlism
    if has_remaining_download is None:
        has_remaining_download = {}

    if len(has_remaining_download) > 0:
        log.info("Pending more downloads...")

    else:
        story = db.find_by_id(table='stories', object_id=stories_id)
        process_extracted_story(db=db, story=story, extractor_args=extractor_args)
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:32,代码来源:downloads.py


示例4: get_links_from_story_text

def get_links_from_story_text(db: DatabaseHandler, story: dict) -> typing.List[str]:
    """Get all urls that appear in the text or description of the story using a simple regex."""
    download_ids = db.query("""
        SELECT downloads_id
        FROM downloads
        WHERE stories_id = %(stories_id)s
        """, {'stories_id': story['stories_id']}
    ).flat()

    download_texts = db.query("""
        SELECT *
        FROM download_texts
        WHERE downloads_id = ANY(%(download_ids)s)
        ORDER BY download_texts_id
        """, {'download_ids': download_ids}
    ).hashes()

    story_text = ' '.join([dt['download_text'] for dt in download_texts])

    story_text = story_text + ' ' + str(story['title']) if story['title'] is not None else story_text
    story_text = story_text + ' ' + str(story['description']) if story['description'] is not None else story_text

    links = []
    for url in re.findall(r'https?://[^\s\")]+', story_text):
        url = re.sub(r'\W+$', '', url)
        links.append(url)

    return links
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:28,代码来源:extract_story_links.py


示例5: create

def create(db: DatabaseHandler, download: dict, extract: dict) -> dict:
    """Create a download_text hash and insert it into the database. Delete any existing download_text row for the
    download."""

    # FIXME don't pass freeform "extract" dict, we need just the "extracted_text"

    download = decode_object_from_bytes_if_needed(download)
    extract = decode_object_from_bytes_if_needed(extract)

    db.query("""
        DELETE FROM download_texts
        WHERE downloads_id = %(downloads_id)s
    """, {'downloads_id': download['downloads_id']})

    download_text = db.query("""
        INSERT INTO download_texts (downloads_id, download_text, download_text_length)
        VALUES (%(downloads_id)s, %(download_text)s, CHAR_LENGTH(%(download_text)s))
        RETURNING *
    """, {
        'downloads_id': download['downloads_id'],
        'download_text': extract['extracted_text'],
    }).hash()

    db.query("""
        UPDATE downloads
        SET extracted = 't'
        WHERE downloads_id = %(downloads_id)s
    """, {'downloads_id': download['downloads_id']})

    return download_text
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:30,代码来源:download_texts.py


示例6: lookup_medium

def lookup_medium(db: DatabaseHandler, url: str, name: str) -> typing.Optional[dict]:
    """Lookup a media source by normalized url and then name.

    Uses mediawords.util.url.normalize_url_lossy to normalize urls.  Returns the parent media for duplicate media
    sources and returns no media that are marked foreign_rss_links.

    This function queries the media.normalized_url field to find the matching urls.  Because the normalization
    function is in python, we have to keep that denormalized_url field current from within python.  This function
    is responsible for keeping the table up to date by filling the field for any media for which it is null.
    Arguments:
    db - db handle
    url - url to lookup
    name - name to lookup

    Returns:
    a media source dict or None

    """
    _update_media_normalized_urls(db)

    nu = _normalize_url(url)

    lookup_query = \
        """
        select m.*
            from media m
            where
                m.normalized_url = %(a)s and
                foreign_rss_links = 'f'
            order by dup_media_id asc nulls last, media_id asc
        """

    medium = db.query(lookup_query, {'a': nu}).hash()

    if medium is None:
        medium = db.query(
            "select m.* from media m where lower(m.name) = lower(%(a)s) and m.foreign_rss_links = false",
            {'a': name}).hash()

    if medium is None:
        return None

    if medium['dup_media_id'] is not None:

        media_cycle_lookup = dict()  # type: dict
        while medium['dup_media_id'] is not None:
            if medium['media_id'] in media_cycle_lookup:
                raise McTopicMediaException('Cycle found in duplicate media path: ' + str(media_cycle_lookup.keys()))
            media_cycle_lookup[medium['media_id']] = True

            medium = db.query("select * from media where media_id = %(a)s", {'a': medium['dup_media_id']}).hash()

    if medium['foreign_rss_links']:
        raise McTopicMediaException('Parent duplicate media source %d has foreign_rss_links' % medium['media_id'])

    return medium
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:56,代码来源:media.py


示例7: _insert_tweet_urls

def _insert_tweet_urls(db: DatabaseHandler, topic_tweet: dict, urls: typing.List) -> typing.List:
    """Insert list of urls into topic_tweet_urls."""
    for url in urls:
        db.query(
            """
            insert into topic_tweet_urls( topic_tweets_id, url )
                values( %(a)s, %(b)s )
                on conflict do nothing
            """,
            {'a': topic_tweet['topic_tweets_id'], 'b': url})
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:10,代码来源:fetch_topic_tweets.py


示例8: change_password

def change_password(db: DatabaseHandler,
                    email: str,
                    new_password: str,
                    new_password_repeat: str,
                    do_not_inform_via_email: bool = False) -> None:
    """Change user's password."""

    email = decode_object_from_bytes_if_needed(email)
    new_password = decode_object_from_bytes_if_needed(new_password)
    new_password_repeat = decode_object_from_bytes_if_needed(new_password_repeat)

    if isinstance(do_not_inform_via_email, bytes):
        do_not_inform_via_email = decode_object_from_bytes_if_needed(do_not_inform_via_email)

    do_not_inform_via_email = bool(int(do_not_inform_via_email))

    # Check if user exists
    try:
        user = user_info(db=db, email=email)
    except Exception:
        raise McAuthChangePasswordException('User with email address "%s" does not exist.' % email)

    password_validation_message = validate_new_password(email=email,
                                                        password=new_password,
                                                        password_repeat=new_password_repeat)
    if password_validation_message:
        raise McAuthChangePasswordException("Unable to change password: %s" % password_validation_message)

    # Hash + validate the password
    try:
        password_new_hash = generate_secure_hash(password=new_password)
    except Exception as ex:
        raise McAuthChangePasswordException("Unable to hash a new password: %s" % str(ex))

    if not password_new_hash:
        raise McAuthChangePasswordException("Generated password hash is empty.")

    # Set the password hash
    db.query("""
        UPDATE auth_users
        SET password_hash = %(password_hash)s,
            active = TRUE
        WHERE email = %(email)s
    """, {
        'email': email,
        'password_hash': password_new_hash,
    })

    if not do_not_inform_via_email:

        message = AuthPasswordChangedMessage(to=email, full_name=user.full_name())
        if not send_email(message):
            raise McAuthChangePasswordException(
                'The password has been changed, but I was unable to send an email notifying you about the change.'
            )
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:55,代码来源:change_password.py


示例9: __get_topic_url_variants

def __get_topic_url_variants(db: DatabaseHandler, urls: List[str]) -> List[str]:
    """Get any alternative urls for the given url from topic_merged_stories or topic_links."""

    urls = decode_object_from_bytes_if_needed(urls)

    # MC_REWRITE_TO_PYTHON: change to tuple parameter because Perl database handler proxy can't handle tuples
    stories_ids_sql = "SELECT stories_id "
    stories_ids_sql += "FROM stories "
    stories_ids_sql += "WHERE url = ANY(?)"
    stories_ids = db.query(stories_ids_sql, urls).flat()

    # MC_REWRITE_TO_PYTHON: Perl database handler proxy (the dreaded "wantarray" part) returns None on empty result
    # sets, a scalar on a single item and arrayref on many items
    if stories_ids is None:
        stories_ids = []
    elif isinstance(stories_ids, int):
        stories_ids = [stories_ids]

    stories_ids = [int(x) for x in stories_ids]

    all_stories_ids = __get_merged_stories_ids(db=db, stories_ids=stories_ids)
    if len(all_stories_ids) == 0:
        return urls

    all_urls = db.query("""
        SELECT DISTINCT url
        FROM (
            SELECT redirect_url AS url
            FROM topic_links
            WHERE ref_stories_id = ANY(?)

            UNION

            SELECT url
            FROM topic_links
            WHERE ref_stories_id = ANY(?)

            UNION

            SELECT url
            FROM stories
            WHERE stories_id = ANY(?)
        ) AS q
        WHERE q IS NOT NULL
    """, all_stories_ids, all_stories_ids, all_stories_ids).flat()

    # MC_REWRITE_TO_PYTHON: Perl database handler proxy (the dreaded "wantarray" part) returns None on empty result
    # sets, a scalar on a single item and arrayref on many items
    if all_urls is None:
        all_urls = []
    elif isinstance(all_urls, str):
        all_urls = [all_urls]

    return all_urls
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:54,代码来源:variants.py


示例10: __remove_object_from_cache

    def __remove_object_from_cache(self, db: DatabaseHandler, object_id: int) -> None:
        """Attempt to remove object from cache.

        Raise if removal fails because after removal we'd expect the object to be gone for good."""

        object_id = self._prepare_object_id(object_id)

        sql = "DELETE FROM %s " % self.__cache_table  # interpolated by Python
        sql += "WHERE object_id = %(object_id)s"  # interpolated by psycopg2

        db.query(sql, {'object_id': object_id})
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:11,代码来源:cached_amazon_s3.py


示例11: create_password_reset_token

def create_password_reset_token(db: DatabaseHandler, email: str) -> Optional[str]:
    """Generate password reset token used for both activating newly registered users and resetting passwords.

    Returns non-hashed password reset token or None if user was not found.
    """

    email = decode_object_from_bytes_if_needed(email)

    if not email:
        raise McAuthProfileException('Email address is empty.')

    # Check if the email address exists in the user table; if not, pretend that we sent the activation link with a
    # "success" message. That way the adversary would not be able to find out which email addresses are active users.
    #
    # (Possible improvement: make the script work for the exact same amount of time in both cases to avoid timing
    # attacks)
    user_exists = db.query("""
        SELECT auth_users_id,
               email
        FROM auth_users
        WHERE email = %(email)s
        LIMIT 1
    """, {'email': email}).hash()
    if user_exists is None or len(user_exists) == 0:
        # User was not found, so set the email address to an empty string, but don't return just now and continue with a
        # rather slowish process of generating a activation token (in order to reduce the risk of timing attacks)
        email = ''

    # Generate the activation token
    password_reset_token = random_string(length=64)
    if len(password_reset_token) == 0:
        raise McAuthProfileException('Unable to generate an activation token.')

    # Hash + validate the activation token
    password_reset_token_hash = generate_secure_hash(password=password_reset_token)
    if not password_reset_token_hash:
        raise McAuthProfileException("Unable to hash an activation token.")

    # Set the activation token hash in the database (if the email address doesn't exist, this query will do nothing)
    db.query("""
        UPDATE auth_users
        SET password_reset_token_hash = %(password_reset_token_hash)s
        WHERE email = %(email)s
          AND email != ''
    """, {
        'email': email,
        'password_reset_token_hash': password_reset_token_hash,
    })

    return password_reset_token
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:50,代码来源:profile.py


示例12: get_consistent_color

def get_consistent_color(db: DatabaseHandler, item_set: str, item_id: str) -> str:
    """Return the same hex color (e.g. "ff0000" for the same set / ID combination every time this function is called."""
    item_set = decode_object_from_bytes_if_needed(item_set)
    item_id = decode_object_from_bytes_if_needed(item_id)

    # Always return grey for null or not typed values
    if item_id.lower() in {'null', 'not typed'}:
        return '999999'

    color = db.query("""SELECT color FROM color_sets WHERE color_set = %(item_set)s AND id = %(item_id)s""", {
        'item_set': item_set,
        'item_id': item_id,
    }).flat()
    if color is not None and len(color):
        if isinstance(color, list):
            color = color[0]
        return color

    set_colors = db.query("""SELECT color FROM color_sets WHERE color_set = %(item_set)s""", {
        'item_set': item_set,
    }).flat()
    if set_colors is not None:
        if not isinstance(set_colors, list):
            set_colors = [set_colors]

    existing_colors = set()

    if set_colors is not None:
        for color in set_colors:
            existing_colors.add(color)

    # Use the hard coded palette of 25 colors if possible
    new_color = None
    for color in __MC_COLORS:
        if color not in existing_colors:
            new_color = color
            break

    # Otherwise, just generate a random color
    if new_color is None:
        colors = analogous_color(color='0000ff', return_slices=256, split_slices=255)
        new_color = random.choice(colors)

    db.create(table='color_sets', insert_hash={
        'color_set': item_set,
        'id': item_id,
        'color': new_color,
    })

    return new_color
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:50,代码来源:colors.py


示例13: _add_topic_tweet_single_day

def _add_topic_tweet_single_day(
        db: DatabaseHandler,
        topic: dict,
        day: datetime.datetime,
        ch_class: typing.Type[AbstractCrimsonHexagon]) -> dict:
    """
    Add a row to topic_tweet_day if it does not already exist.  fetch data for new row from CH.

    Arguments:
    db - database handle
    topic - topic dict
    day - date to fetch eg '2017-12-30'
    ch_class - AbstractCrimsonHexagon class

    Return:
    None
    """
    # the perl-python layer was segfaulting until I added the str() around day below -hal
    topic_tweet_day = db.query(
        "select * from topic_tweet_days where topics_id = %(a)s and day = %(b)s",
        {'a': topic['topics_id'], 'b': str(day)}).hash()

    if topic_tweet_day is not None and topic_tweet_day['tweets_fetched']:
        raise McFetchTopicTweetDateFetchedException("tweets already fetched for day " + str(day))

    # if we have a ttd but had not finished fetching tweets, delete it and start over
    if topic_tweet_day is not None:
        db.delete_by_id('topic_tweet_days', topic_tweet_day['topic_tweet_days_id'])

    ch_posts = ch_class.fetch_posts(topic['ch_monitor_id'], day)

    tweet_count = ch_posts['totalPostsAvailable']

    num_ch_tweets = len(ch_posts['posts'])

    topic_tweet_day = db.create(
        'topic_tweet_days',
        {
            'topics_id': topic['topics_id'],
            'day': day,
            'tweet_count': tweet_count,
            'num_ch_tweets': num_ch_tweets,
            'tweets_fetched': False
        })

    topic_tweet_day['ch_posts'] = ch_posts

    return topic_tweet_day
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:48,代码来源:fetch_topic_tweets.py


示例14: regenerate_api_key

def regenerate_api_key(db: DatabaseHandler, email: str) -> None:
    """Regenerate API key -- creates new non-IP limited API key, removes all IP-limited API keys."""

    email = decode_object_from_bytes_if_needed(email)

    if not email:
        raise McAuthProfileException('Email address is empty.')

    # Check if user exists
    try:
        user = user_info(db=db, email=email)
    except Exception:
        raise McAuthProfileException("User with email address '%s' does not exist." % email)

    db.begin()

    # Purge all IP-limited API keys
    db.query("""
        DELETE FROM auth_user_api_keys
        WHERE ip_address IS NOT NULL
          AND auth_users_id = (
            SELECT auth_users_id
            FROM auth_users
            WHERE email = %(email)s
          )
    """, {'email': email})

    # Regenerate non-IP limited API key
    db.query("""
        UPDATE auth_user_api_keys

        -- DEFAULT points to a generation function
        SET api_key = DEFAULT

        WHERE ip_address IS NULL
          AND auth_users_id = (
            SELECT auth_users_id
            FROM auth_users
            WHERE email = %(email)s
          )
    """, {'email': email})

    message = AuthAPIKeyResetMessage(to=email, full_name=user.full_name())
    if not send_email(message):
        db.rollback()
        raise McAuthProfileException("Unable to send email about reset API key.")

    db.commit()
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:48,代码来源:profile.py


示例15: _get_story_with_most_sentences

def _get_story_with_most_sentences(db: DatabaseHandler, stories: list) -> dict:
    """Given a list of stories, return the story with the most sentences."""
    assert len(stories) > 0

    if len(stories) == 1:
        return stories[0]

    story = db.query(
        """
        select s.*
            from stories s
            where stories_id in (
                select stories_id
                    from story_sentences
                    where stories_id = any (%(a)s)
                    group by stories_id
                    order by count(*) desc
                    limit 1
            )
        """,
        {'a': [s['stories_id'] for s in stories]}).hash()

    if story is not None:
        return story
    else:
        return stories[0]
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:26,代码来源:stories.py


示例16: _get_seeded_content

def _get_seeded_content(db: DatabaseHandler, topic_fetch_url: dict) -> typing.Optional[FetchLinkResponse]:
    """Return content for this url and topic in topic_seed_urls.

    Arguments:
    db - db handle
    topic_fetch_url - topic_fetch_url dict from db

    Returns:
    dummy response object

    """
    r = db.query(
        "select content from topic_seed_urls where topics_id = %(a)s and url = %(b)s and content is not null",
        {'a': topic_fetch_url['topics_id'], 'b': topic_fetch_url['url']}).flat()

    if len(r) == 0:
        return None

    return FetchLinkResponse(
        url=topic_fetch_url['url'],
        is_success=True,
        code=HTTPStatus.OK.value,
        message=HTTPStatus.OK.phrase,
        content=r[0],
        last_requested_url=topic_fetch_url['url'],
    )
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:26,代码来源:fetch_link.py


示例17: get_spider_feed

def get_spider_feed(db: DatabaseHandler, medium: dict) -> dict:
    """Find or create the 'Spider Feed' feed for the media source."""

    feed = db.query(
        "select * from feeds where media_id = %(a)s and name = %(b)s",
        {'a': medium['media_id'], 'b': SPIDER_FEED_NAME}).hash()

    if feed is not None:
        return feed

    return db.find_or_create('feeds', {
        'media_id': medium['media_id'],
        'url': medium['url'] + '#spiderfeed',
        'name': SPIDER_FEED_NAME,
        'active': False,
    })
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:16,代码来源:stories.py


示例18: _get_ap_dup_sentence_lengths

def _get_ap_dup_sentence_lengths(db: DatabaseHandler, story_text: str) -> List[int]:
    story_text = decode_object_from_bytes_if_needed(story_text)

    ap_media_id = _get_ap_media_id(db=db)

    if ap_media_id is None:
        return []

    sentences = _get_sentences_from_content(story_text=story_text)

    md5s = []
    for sentence in sentences:
        md5_hash = hashlib.md5(sentence.encode('utf-8')).hexdigest()
        md5s.append(md5_hash)

    sentence_lengths = db.query("""
        SELECT length(sentence) AS len
        FROM story_sentences
        WHERE media_id = %(ap_media_id)s

          -- FIXME this probably never worked because the index is half_md5(), not md5()
          AND md5(sentence) = ANY(%(md5s)s)
    """, {
        'ap_media_id': ap_media_id,
        'md5s': md5s,
    }).flat()

    # MC_REWRITE_TO_PYTHON: Perlism
    if sentence_lengths is None:
        sentence_lengths = []

    return sentence_lengths
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:32,代码来源:ap.py


示例19: lookup_tag

def lookup_tag(db: DatabaseHandler, tag_name: str) -> Optional[Dict[str, Any]]:
    """Lookup the tag given the tag_set:tag format."""
    tag_name = decode_object_from_bytes_if_needed(tag_name)

    if not tag_name:
        log.warning("Tag name is empty.")
        return None

    if not re.match(pattern='^([^:]*):([^:]*)$', string=tag_name):
        log.warning("Unable to parse tag name '{}'.".format(tag_name))
        return None

    tag_set_name, tag = tag_name.split(':')

    found_tag = db.query("""
        SELECT t.*
        FROM tags AS t,
             tag_sets AS ts
        WHERE t.tag_sets_id = ts.tag_sets_id
          AND t.tag = %(tag)s
          AND ts.name = %(tag_set_name)s
    """, {'tag': tag, 'tag_set_name': tag_set_name}).hash()

    # MC_REWRITE_TO_PYTHON: Perlism
    if found_tag is None:
        found_tag = {}

    return found_tag
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:28,代码来源:tags.py


示例20: _get_deduped_medium

def _get_deduped_medium(db: DatabaseHandler, media_id: int) -> dict:
    """Get either the referenced medium or the deduped version of the medium by recursively following dup_media_id."""
    medium = db.require_by_id('media', media_id)
    if medium['dup_media_id'] is None:
        return medium
    else:
        return _get_deduped_medium(db, medium['dup_media_id'])
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:7,代码来源:stories.py



注:本文中的mediawords.db.DatabaseHandler类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python perl.decode_object_from_bytes_if_needed函数代码示例发布时间:2022-05-27
下一篇:
Python tal.getTAL函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap