def create_repository( app, name, type, description, long_description, user_id, category_ids=[], remote_repository_url=None, homepage_url=None ):
"""Create a new ToolShed repository"""
sa_session = app.model.context.current
# Add the repository record to the database.
repository = app.model.Repository( name=name,
type=type,
remote_repository_url=remote_repository_url,
homepage_url=homepage_url,
description=description,
long_description=long_description,
user_id=user_id )
# Flush to get the id.
sa_session.add( repository )
sa_session.flush()
# Create an admin role for the repository.
create_repository_admin_role( app, repository )
# Determine the repository's repo_path on disk.
dir = os.path.join( app.config.file_path, *directory_hash_id( repository.id ) )
# Create directory if it does not exist.
if not os.path.exists( dir ):
os.makedirs( dir )
# Define repo name inside hashed directory.
repository_path = os.path.join( dir, "repo_%d" % repository.id )
# Create local repository directory.
if not os.path.exists( repository_path ):
os.makedirs( repository_path )
# Create the local repository.
hg_util.get_repo_for_repository( app, repository=None, repo_path=repository_path, create=True )
# Add an entry in the hgweb.config file for the local repository.
lhs = "repos/%s/%s" % ( repository.user.username, repository.name )
app.hgweb_config_manager.add_entry( lhs, repository_path )
# Create a .hg/hgrc file for the local repository.
hg_util.create_hgrc_file( app, repository )
flush_needed = False
if category_ids:
# Create category associations
for category_id in category_ids:
category = sa_session.query( app.model.Category ) \
.get( app.security.decode_id( category_id ) )
rca = app.model.RepositoryCategoryAssociation( repository, category )
sa_session.add( rca )
flush_needed = True
if flush_needed:
sa_session.flush()
# Update the repository registry.
app.repository_registry.add_entry( repository )
message = "Repository <b>%s</b> has been created." % escape( str( repository.name ) )
return repository, message
def get_version_lineage_for_tool( self, repository_id, repository_metadata, guid ):
"""
Return the tool version lineage chain in descendant order for the received
guid contained in the received repsitory_metadata.tool_versions. This function
is called only from the Tool Shed.
"""
repository = suc.get_repository_by_id( self.app, repository_id )
repo = hg_util.get_repo_for_repository( self.app, repository=repository, repo_path=None, create=False )
# Initialize the tool lineage
version_lineage = [ guid ]
# Get all ancestor guids of the received guid.
current_child_guid = guid
for changeset in hg_util.reversed_upper_bounded_changelog( repo, repository_metadata.changeset_revision ):
ctx = repo.changectx( changeset )
rm = suc.get_repository_metadata_by_changeset_revision( self.app, repository_id, str( ctx ) )
if rm:
parent_guid = rm.tool_versions.get( current_child_guid, None )
if parent_guid:
version_lineage.append( parent_guid )
current_child_guid = parent_guid
# Get all descendant guids of the received guid.
current_parent_guid = guid
for changeset in hg_util.reversed_lower_upper_bounded_changelog( repo,
repository_metadata.changeset_revision,
repository.tip( self.app ) ):
ctx = repo.changectx( changeset )
rm = suc.get_repository_metadata_by_changeset_revision( self.app, repository_id, str( ctx ) )
if rm:
tool_versions = rm.tool_versions
for child_guid, parent_guid in tool_versions.items():
if parent_guid == current_parent_guid:
version_lineage.insert( 0, child_guid )
current_parent_guid = child_guid
break
return version_lineage
def get_latest_downloadable_repository_metadata( trans, repository ):
"""
Return the latest downloadable repository_metadata record for the received repository. This will
return repositories of type unrestricted as well as types repository_suite_definition and
tool_dependency_definition.
"""
encoded_repository_id = trans.security.encode_id( repository.id )
repo = hg_util.get_repo_for_repository( trans.app, repository=repository, repo_path=None, create=False )
tip_ctx = str( repo.changectx( repo.changelog.tip() ) )
repository_metadata = None
try:
repository_metadata = suc.get_repository_metadata_by_changeset_revision( trans.app, encoded_repository_id, tip_ctx )
if repository_metadata is not None and repository_metadata.downloadable:
return repository_metadata
return None
except:
latest_downloadable_revision = metadata_util.get_previous_metadata_changeset_revision( repository,
repo,
tip_ctx,
downloadable=True )
if latest_downloadable_revision == hg_util.INITIAL_CHANGELOG_HASH:
return None
repository_metadata = suc.get_repository_metadata_by_changeset_revision( trans.app,
encoded_repository_id,
latest_downloadable_revision )
if repository_metadata is not None and repository_metadata.downloadable:
return repository_metadata
return None
def get_ordered_installable_revisions( self, trans, name, owner, **kwd ):
"""
GET /api/repositories/get_ordered_installable_revisions
:param name: the name of the Repository
:param owner: the owner of the Repository
Returns the ordered list of changeset revision hash strings that are associated with installable revisions.
As in the changelog, the list is ordered oldest to newest.
"""
# Example URL: http://localhost:9009/api/repositories/get_installable_revisions?name=add_column&owner=test
if name and owner:
# Get the repository information.
repository = suc.get_repository_by_name_and_owner( trans.app, name, owner )
if repository is None:
error_message = "Error in the Tool Shed repositories API in get_ordered_installable_revisions: "
error_message += "cannot locate repository %s owned by %s." % ( str( name ), str( owner ) )
log.debug( error_message )
return []
repo = hg_util.get_repo_for_repository( trans.app, repository=repository, repo_path=None, create=False )
ordered_installable_revisions = suc.get_ordered_metadata_changeset_revisions( repository, repo, downloadable=True )
return ordered_installable_revisions
else:
error_message = "Error in the Tool Shed repositories API in get_ordered_installable_revisions: "
error_message += "invalid name %s or owner %s received." % ( str( name ), str( owner ) )
log.debug( error_message )
return []
def build_readme_files_dict( app, repository, changeset_revision, metadata, tool_path=None ):
"""
Return a dictionary of valid readme file name <-> readme file content pairs for all readme files defined in the received metadata. Since the
received changeset_revision (which is associated with the received metadata) may not be the latest installable changeset revision, the README
file contents may not be available on disk. This method is used by both Galaxy and the Tool Shed.
"""
if app.name == 'galaxy':
can_use_disk_files = True
else:
repo = hg_util.get_repo_for_repository( app, repository=repository, repo_path=None, create=False )
latest_downloadable_changeset_revision = suc.get_latest_downloadable_changeset_revision( app, repository, repo )
can_use_disk_files = changeset_revision == latest_downloadable_changeset_revision
readme_files_dict = {}
if metadata:
if 'readme_files' in metadata:
for relative_path_to_readme_file in metadata[ 'readme_files' ]:
readme_file_name = os.path.split( relative_path_to_readme_file )[ 1 ]
if can_use_disk_files:
if tool_path:
full_path_to_readme_file = os.path.abspath( os.path.join( tool_path, relative_path_to_readme_file ) )
else:
full_path_to_readme_file = os.path.abspath( relative_path_to_readme_file )
text = None
try:
f = open( full_path_to_readme_file, 'r' )
text = unicodify( f.read() )
f.close()
except Exception, e:
log.exception( "Error reading README file '%s' from disk: %s" % ( str( relative_path_to_readme_file ), str( e ) ) )
text = None
if text:
text_of_reasonable_length = basic_util.size_string( text )
if text_of_reasonable_length.find( '.. image:: ' ) >= 0:
# Handle image display for README files that are contained in repositories in the tool shed or installed into Galaxy.
lock = threading.Lock()
lock.acquire( True )
try:
text_of_reasonable_length = suc.set_image_paths( app,
app.security.encode_id( repository.id ),
text_of_reasonable_length )
except Exception, e:
log.exception( "Exception in build_readme_files_dict, so images may not be properly displayed:\n%s" % str( e ) )
finally:
lock.release()
if readme_file_name.endswith( '.rst' ):
text_of_reasonable_length = Template( rst_to_html( text_of_reasonable_length ),
input_encoding='utf-8',
output_encoding='utf-8',
default_filters=[ 'decode.utf8' ],
encoding_errors='replace' )
text_of_reasonable_length = text_of_reasonable_length.render( static_path=web.url_for( '/static' ),
host_url=web.url_for( '/', qualified=True ) )
text_of_reasonable_length = unicodify( text_of_reasonable_length )
else:
text_of_reasonable_length = basic_util.to_html_string( text_of_reasonable_length )
readme_files_dict[ readme_file_name ] = text_of_reasonable_length
def get_repo_info_dict( app, user, repository_id, changeset_revision ):
repository = suc.get_repository_in_tool_shed( app, repository_id )
repo = hg_util.get_repo_for_repository( app, repository=repository, repo_path=None, create=False )
repository_clone_url = common_util.generate_clone_url_for_repository_in_tool_shed( user, repository )
repository_metadata = suc.get_repository_metadata_by_changeset_revision( app,
repository_id,
changeset_revision )
if not repository_metadata:
# The received changeset_revision is no longer installable, so get the next changeset_revision
# in the repository's changelog. This generally occurs only with repositories of type
# repository_suite_definition or tool_dependency_definition.
next_downloadable_changeset_revision = \
suc.get_next_downloadable_changeset_revision( repository, repo, changeset_revision )
if next_downloadable_changeset_revision:
repository_metadata = suc.get_repository_metadata_by_changeset_revision( app,
repository_id,
next_downloadable_changeset_revision )
if repository_metadata:
# For now, we'll always assume that we'll get repository_metadata, but if we discover our assumption
# is not valid we'll have to enhance the callers to handle repository_metadata values of None in the
# returned repo_info_dict.
metadata = repository_metadata.metadata
if 'tools' in metadata:
includes_tools = True
else:
includes_tools = False
includes_tools_for_display_in_tool_panel = repository_metadata.includes_tools_for_display_in_tool_panel
repository_dependencies_dict = metadata.get( 'repository_dependencies', {} )
repository_dependencies = repository_dependencies_dict.get( 'repository_dependencies', [] )
has_repository_dependencies, has_repository_dependencies_only_if_compiling_contained_td = \
suc.get_repository_dependency_types( repository_dependencies )
if 'tool_dependencies' in metadata:
includes_tool_dependencies = True
else:
includes_tool_dependencies = False
else:
# Here's where we may have to handle enhancements to the callers. See above comment.
includes_tools = False
has_repository_dependencies = False
has_repository_dependencies_only_if_compiling_contained_td = False
includes_tool_dependencies = False
includes_tools_for_display_in_tool_panel = False
ctx = hg_util.get_changectx_for_changeset( repo, changeset_revision )
repo_info_dict = create_repo_info_dict( app=app,
repository_clone_url=repository_clone_url,
changeset_revision=changeset_revision,
ctx_rev=str( ctx.rev() ),
repository_owner=repository.user.username,
repository_name=repository.name,
repository=repository,
repository_metadata=repository_metadata,
tool_dependencies=None,
repository_dependencies=None )
return repo_info_dict, includes_tools, includes_tool_dependencies, includes_tools_for_display_in_tool_panel, \
has_repository_dependencies, has_repository_dependencies_only_if_compiling_contained_td
def has_previous_repository_reviews( app, repository, changeset_revision ):
"""
Determine if a repository has a changeset revision review prior to the
received changeset revision.
"""
repo = hg_util.get_repo_for_repository( app, repository=repository, repo_path=None, create=False )
reviewed_revision_hashes = [ review.changeset_revision for review in repository.reviews ]
for changeset in hg_util.reversed_upper_bounded_changelog( repo, changeset_revision ):
previous_changeset_revision = str( repo.changectx( changeset ) )
if previous_changeset_revision in reviewed_revision_hashes:
return True
return False
def get_certified_level_one_tuple( self, repository ):
"""
Return True if the latest installable changeset_revision of the received repository is level one certified.
"""
if repository is None:
return ( None, False )
if repository.deleted or repository.deprecated:
return ( None, False )
repo = hg_util.get_repo_for_repository( self.app, repository=repository, repo_path=None, create=False )
# Get the latest installable changeset revision since that is all that is currently configured for testing.
latest_installable_changeset_revision = suc.get_latest_downloadable_changeset_revision( self.app, repository, repo )
if latest_installable_changeset_revision not in [ None, hg_util.INITIAL_CHANGELOG_HASH ]:
encoded_repository_id = self.app.security.encode_id( repository.id )
repository_metadata = suc.get_repository_metadata_by_changeset_revision( self.app,
encoded_repository_id,
latest_installable_changeset_revision )
if repository_metadata:
# Filter out repository revisions that have not been tested.
if repository_metadata.time_last_tested is not None and repository_metadata.tool_test_results is not None:
if repository.type in [ rt_util.REPOSITORY_SUITE_DEFINITION, rt_util.TOOL_DEPENDENCY_DEFINITION ]:
# Look in the tool_test_results dictionary for installation errors.
try:
tool_test_results_dict = repository_metadata.tool_test_results[ 0 ]
except Exception, e:
message = 'Error attempting to retrieve install and test results for repository %s:\n' % str( repository.name )
message += '%s' % str( e )
log.exception( message )
return ( latest_installable_changeset_revision, False )
if 'installation_errors' in tool_test_results_dict:
return ( latest_installable_changeset_revision, False )
return ( latest_installable_changeset_revision, True )
else:
# We have a repository with type Unrestricted.
if repository_metadata.includes_tools:
if repository_metadata.tools_functionally_correct:
return ( latest_installable_changeset_revision, True )
return ( latest_installable_changeset_revision, False )
else:
# Look in the tool_test_results dictionary for installation errors.
try:
tool_test_results_dict = repository_metadata.tool_test_results[ 0 ]
except Exception, e:
message = 'Error attempting to retrieve install and test results for repository %s:\n' % str( repository.name )
message += '%s' % str( e )
log.exception( message )
return ( latest_installable_changeset_revision, False )
if 'installation_errors' in tool_test_results_dict:
return ( latest_installable_changeset_revision, False )
return ( latest_installable_changeset_revision, True )
else:
# No test results.
return ( latest_installable_changeset_revision, False )
def handle_directory_changes( app, host, username, repository, full_path, filenames_in_archive, remove_repo_files_not_in_tar,
new_repo_alert, commit_message, undesirable_dirs_removed, undesirable_files_removed ):
repo = hg_util.get_repo_for_repository( app, repository=repository, repo_path=None, create=False )
content_alert_str = ''
files_to_remove = []
filenames_in_archive = [ os.path.join( full_path, name ) for name in filenames_in_archive ]
if remove_repo_files_not_in_tar and not repository.is_new( app ):
# We have a repository that is not new (it contains files), so discover those files that are in the
# repository, but not in the uploaded archive.
for root, dirs, files in os.walk( full_path ):
if root.find( '.hg' ) < 0 and root.find( 'hgrc' ) < 0:
for undesirable_dir in UNDESIRABLE_DIRS:
if undesirable_dir in dirs:
dirs.remove( undesirable_dir )
undesirable_dirs_removed += 1
for undesirable_file in UNDESIRABLE_FILES:
if undesirable_file in files:
files.remove( undesirable_file )
undesirable_files_removed += 1
for name in files:
full_name = os.path.join( root, name )
if full_name not in filenames_in_archive:
files_to_remove.append( full_name )
for repo_file in files_to_remove:
# Remove files in the repository (relative to the upload point) that are not in
# the uploaded archive.
try:
hg_util.remove_file( repo.ui, repo, repo_file, force=True )
except Exception, e:
log.debug( "Error removing files using the mercurial API, so trying a different approach, the error was: %s" % str( e ))
relative_selected_file = repo_file.split( 'repo_%d' % repository.id )[1].lstrip( '/' )
repo.dirstate.remove( relative_selected_file )
repo.dirstate.write()
absolute_selected_file = os.path.abspath( repo_file )
if os.path.isdir( absolute_selected_file ):
try:
os.rmdir( absolute_selected_file )
except OSError, e:
# The directory is not empty.
pass
elif os.path.isfile( absolute_selected_file ):
os.remove( absolute_selected_file )
dir = os.path.split( absolute_selected_file )[0]
try:
os.rmdir( dir )
except OSError, e:
# The directory is not empty.
pass
def should_set_do_not_test_flag( app, repository, changeset_revision, testable_revision ):
"""
The received testable_revision is True if the tool has defined tests and test files are in the repository
This method returns True if the received repository has multiple downloadable revisions and the received
changeset_revision is not the most recent downloadable revision and the received testable_revision is False.
In this case, the received changeset_revision will never be updated with correct data, and re-testing it
would be redundant.
"""
if not testable_revision:
repo = hg_util.get_repo_for_repository( app, repository=repository, repo_path=None, create=False )
changeset_revisions = suc.get_ordered_metadata_changeset_revisions( repository, repo, downloadable=True )
if len( changeset_revisions ) > 1:
latest_downloadable_revision = changeset_revisions[ -1 ]
if changeset_revision != latest_downloadable_revision:
return True
return False
请发表评论