本文整理汇总了Python中urlparse.urldefrag函数的典型用法代码示例。如果您正苦于以下问题:Python urldefrag函数的具体用法?Python urldefrag怎么用?Python urldefrag使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urldefrag函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: make_correct_link
def make_correct_link(base, link):
"""
makes links correct:
http://... -- pass
/... -- adding link to site before
www. ... -- adding http:// before
smth.html -- adding full path before
it's planned to be a wrapper over urlparse's functions
done in order to handle all possible cases of url presentation
handles absolute url and relative url
clean up all 'fragments' in url (like http://site.ru/1.html#4 -> http://site.ru/1.html)
"""
defrag_link, _ = urlparse.urldefrag(link)
defrag_base, _ = urlparse.urldefrag(base)
# case 'g.html on http://ya.ru/a ==> http://ya.ru/a/g.html
# (added slash after a if it's a unslashed folder
# defining unslashed folder: empty query (like 'a.php?set=1'),
# no dots
# no closing slash
scheme, netloc, url, params, query, fragment = urlparse.urlparse(defrag_base)
if url and not query and not re.search("/$", url) and not re.search("\.", url ):
url += '/'
defrag_base = urlparse.urlunparse( (scheme, netloc, url, params, query, fragment) )
#just rejoining all parts
return_link = urlparse.urljoin(defrag_base, defrag_link)
return return_link
开发者ID:ktisha,项目名称:ebook-service,代码行数:29,代码来源:helpers.py
示例2: crawlWeb
def crawlWeb(UrlafterConnect,keyword):
if not UrlafterConnect:
print("Url is empty")
return list()
#Get all the links
soup = BeautifulSoup(UrlafterConnect)
urllist = []
#check for the existence of keyword IR and crawl on those urls
if re.search(keyword, str(soup), re.IGNORECASE) != None:
for link in soup.find_all('a', href=True):
crawl = link.get('href')
crawl_url = crawl.encode('utf-8')
if not crawl_url:
continue
#links present in the same directory of /wiki, if so convert them to http form
if crawl_url.startswith('/wiki'):
if (crawl_url.find(':') == -1) and (crawl_url != "/wiki/Main_Page"):
crawl_url = urlparse.urljoin("http://en.wikipedia.org",crawl_url)
crawl_url, frag = urlparse.urldefrag(crawl_url)
urllist.append(crawl_url)
else:
#Get only wiki links without colons in it and not redirecting to main page
if crawl_url.startswith('http://en.wikipedia.org'):
if crawl_url != "http://en.wikipedia.org/wiki/Main_Page":
s = "http://en"
crawl = crawl_url.lstrip("http://en")
if crawl.find(':') == -1:
crawl_url, frag = urlparse.urldefrag(crawl_url)
urllist.append(crawl_url)
#Remove duplicate entries from the list while returning
return list(set(urllist))
开发者ID:zbxzc35,项目名称:WebCrawler,代码行数:31,代码来源:WebCrawler.py
示例3: _toc_from_html
def _toc_from_html(self, opf):
if 'toc' not in self.oeb.guide:
return False
self.log.debug('Reading TOC from HTML...')
itempath, frag = urldefrag(self.oeb.guide['toc'].href)
item = self.oeb.manifest.hrefs[itempath]
html = item.data
if frag:
elems = xpath(html, './/*[@id="%s"]' % frag)
if not elems:
elems = xpath(html, './/*[@name="%s"]' % frag)
elem = elems[0] if elems else html
while elem != html and not xpath(elem, './/h:a[@href]'):
elem = elem.getparent()
html = elem
titles = defaultdict(list)
order = []
for anchor in xpath(html, './/h:a[@href]'):
href = anchor.attrib['href']
href = item.abshref(urlnormalize(href))
path, frag = urldefrag(href)
if path not in self.oeb.manifest.hrefs:
continue
title = xml2text(anchor)
title = COLLAPSE_RE.sub(' ', title.strip())
if href not in titles:
order.append(href)
titles[href].append(title)
toc = self.oeb.toc
for href in order:
toc.add(' '.join(titles[href]), href)
return True
开发者ID:mihailim,项目名称:calibre,代码行数:32,代码来源:reader.py
示例4: get_links
def get_links(url, depth, atmost_count):
urldfg = urlparse.urldefrag(url)
url = urldfg[0]
urls_list = []
myopener = MyOpener()
try:
page = myopener.open(url)
except:
return []
text = page.read()
page.close()
url_parsed = urlparse.urlparse(url)
domain_name_url_arr = url_parsed.netloc.split(".")
soup = BeautifulSoup(text, "html.parser")
for tag in soup.findAll('a', href=True):
if atmost_count == 0:
break;
tag['href'] = urlparse.urljoin(url, tag['href'])
new_url = urlparse.urldefrag(tag['href'])[0]
new_url_parsed = urlparse.urlparse(new_url)
domain_name_new_url_arr = new_url_parsed.netloc.split('.');
if len(domain_name_url_arr) >= 2 and len(domain_name_new_url_arr) >= 2:
if domain_name_url_arr[-1] != domain_name_new_url_arr[-1] or domain_name_url_arr[-2] != domain_name_new_url_arr[-2]:
continue;
else:
continue;
if new_url[-4:] == '.pdf':
continue;
if new_url not in urls_list:
urls_list.append([new_url, depth + 1])
atmost_count -= 1;
return urls_list
开发者ID:aashray,项目名称:dist_crawl,代码行数:33,代码来源:main_handler.py
示例5: __init__
def __init__(self, toc, j, renderlist, redirects):
self.typedoc = StringIO.StringIO()
self.toc = toc
self.subs = {} # type: Dict
self.docParent = {} # type: Dict
self.docAfter = {} # type: Dict
self.rendered = set() # type: Set
self.redirects = redirects
self.title = None # type: str
for t in j:
if "extends" in t:
for e in aslist(t["extends"]):
add_dictlist(self.subs, e, t["name"])
#if "docParent" not in t and "docAfter" not in t:
# add_dictlist(self.docParent, e, t["name"])
if t.get("docParent"):
add_dictlist(self.docParent, t["docParent"], t["name"])
if t.get("docChild"):
for c in aslist(t["docChild"]):
add_dictlist(self.docParent, t["name"], c)
if t.get("docAfter"):
add_dictlist(self.docAfter, t["docAfter"], t["name"])
_, _, metaschema_loader = schema.get_metaschema()
alltypes = schema.extend_and_specialize(j, metaschema_loader)
self.typemap = {} # type: Dict
self.uses = {} # type: Dict
self.record_refs = {} # type: Dict
for t in alltypes:
self.typemap[t["name"]] = t
try:
if t["type"] == "record":
self.record_refs[t["name"]] = []
for f in t.get("fields", []):
p = has_types(f)
for tp in p:
if tp not in self.uses:
self.uses[tp] = []
if (t["name"], f["name"]) not in self.uses[tp]:
_, frg1 = urlparse.urldefrag(t["name"])
_, frg2 = urlparse.urldefrag(f["name"])
self.uses[tp].append((frg1, frg2))
if tp not in basicTypes and tp not in self.record_refs[t["name"]]:
self.record_refs[t["name"]].append(tp)
except KeyError as e:
_logger.error("Did not find 'type' in %s", t)
raise
for f in alltypes:
if (f["name"] in renderlist or
((not renderlist) and
("extends" not in f) and
("docParent" not in f) and
("docAfter" not in f))):
self.render_type(f, 1)
开发者ID:NBISweden,项目名称:cwltool,代码行数:60,代码来源:makedoc.py
示例6: startElementNS
def startElementNS(self, name, qname, attrs):
stack = self.stack
stack.append(ElementHandler())
current = self.current
parent = self.parent
base = attrs.get(BASE, None)
if base is not None:
base, frag = urldefrag(base)
if parent and parent.base:
base = urljoin(parent.base, base)
else:
systemId = self.locator.getPublicId() or self.locator.getSystemId()
if systemId:
base = urljoin(systemId, base)
else:
if parent:
base = parent.base
if base is None:
systemId = self.locator.getPublicId() or self.locator.getSystemId()
if systemId:
base, frag = urldefrag(systemId)
current.base = base
language = attrs.get(LANG, None)
if language is None:
if parent:
language = parent.language
current.language = language
current.start(name, qname, attrs)
开发者ID:EmuxEvans,项目名称:SmartObject,代码行数:28,代码来源:rdfxml.py
示例7: _urljoin
def _urljoin(base, url):
"""
Construct a full ("absolute") URL by combining a "base URL" with another
URL. Informally, this uses components of the base URL, in particular the
addressing scheme, the network location and (part of) the path, to provide
missing components in the relative URL.
Additionally, the fragment identifier is preserved according to the HTTP
1.1 bis draft.
@type base: C{bytes}
@param base: Base URL.
@type url: C{bytes}
@param url: URL to combine with C{base}.
@return: An absolute URL resulting from the combination of C{base} and
C{url}.
@see: L{urlparse.urljoin}
@see: U{https://tools.ietf.org/html/draft-ietf-httpbis-p2-semantics-22#section-7.1.2}
"""
base, baseFrag = urldefrag(base)
url, urlFrag = urldefrag(urljoin(base, url))
return urljoin(url, b'#' + (urlFrag or baseFrag))
开发者ID:schleichdi2,项目名称:OpenNfr_E2_Gui-5.3,代码行数:26,代码来源:client.py
示例8: parse_showings_table
def parse_showings_table(self, response):
movie_title = response.meta['movieTitle']
movie_url = response.meta['movieUrl']
showings_table_value = response.meta['showingsTableValue']
theater_url = response.meta['theaterUrl']
version = response.meta['version']
showings_table = response.xpath('//div[@class="cinema-movie clearfix"]/div[@value="' + showings_table_value + '"]')
at_least_one_showing_found = False
jump_links = showings_table.css('.jump-to-show').xpath('a')
if len(jump_links) >= 1:
jump_link = jump_links[-1]
if jump_link.xpath('text()').extract_first().endswith(u'>'):
jump_url = urldefrag(response.urljoin(jump_link.xpath('@href').extract_first()))[0]
request = scrapy.Request(jump_url, callback=self.parse_showings_table)
request.meta['movieTitle'] = movie_title
request.meta['movieUrl'] = movie_url
request.meta['showingsTableValue'] = showings_table_value
request.meta['theaterUrl'] = theater_url
request.meta['version'] = version
yield request
else:
for showings_column in showings_table.css('.cinema-movie-dates').xpath('li'):
for showing_cell in showings_column.xpath('ul/li/a'):
at_least_one_showing_found = True
dayAndMonth = showings_column.xpath('div[2]/text()').extract_first().split('/')
day = int(dayAndMonth[0])
month = int(dayAndMonth[1])
hourAndMinute = showing_cell.xpath('text()').extract_first().split(':')
hour = int(hourAndMinute[0])
minute = int(hourAndMinute[1])
#seating_info = showing_cell.xpath('@title').extract_first()[len('<div>'):len('</div>')]
seating_info = showing_cell.xpath('@title').extract_first()[len('<div>'):-len('</div>')].split('</div><div>')
date_obj = datetime(datetime.now().year, month, day, hour, minute)
if date_obj < datetime.now():
date_obj = datetime(datetime.now().year + 1, month, day, hour, minute)
showing = ShowingItem()
showing['movieTitle'] = movie_title
showing['movieUrl'] = movie_url
showing['theaterUrl'] = theater_url
showing['seatingInfo'] = seating_info
showing['showingUrl'] = response.urljoin(showing_cell.xpath('@href').extract_first())
showing['start'] = date_obj.strftime('%Y-%m-%dT%H:%M:00')
showing['version'] = version
yield showing
if at_least_one_showing_found:
next_page = showings_table.css('.showtimes-extra').xpath('a[last()]')
if next_page:
next_page_url = urldefrag(response.urljoin(next_page.xpath('@href')[0].extract()))[0]
request = scrapy.Request(next_page_url, callback=self.parse_showings_table)
request.meta['movieTitle'] = movie_title
request.meta['movieUrl'] = movie_url
request.meta['showingsTableValue'] = showings_table_value
request.meta['theaterUrl'] = theater_url
request.meta['version'] = version
yield request
开发者ID:janaagaard75,项目名称:FilmFilter,代码行数:59,代码来源:showings_spider.py
示例9: get_links
def get_links(response):
if 300 <= response.status_code < 400 and response.headers['location']:
# redirect
yield urlparse.urldefrag(urlparse.urljoin(response.url, response.headers['location'], False))[0]
try:
html = beautify(response)
for i in html.findAll('a', href=True):
yield urlparse.urldefrag(urlparse.urljoin(response.url, i['href'], False))[0]
except NotHtmlException:
pass
开发者ID:mrcrabby,项目名称:eek,代码行数:10,代码来源:spider.py
示例10: job
def job(self, joborder, basedir, output_callback, **kwargs):
# Validate job order
validate.validate_ex(self.names.get_name("input_record_schema", ""), joborder)
requirements = kwargs.get("requirements", []) + self.tool.get("requirements", [])
hints = kwargs.get("hints", []) + self.tool.get("hints", [])
steps = [makeTool(step, basedir) for step in self.tool.get("steps", [])]
random.shuffle(steps)
self.state = {}
self.processStatus = "success"
for i in self.tool["inputs"]:
(_, iid) = urlparse.urldefrag(i["id"])
if iid in joborder:
self.state[i["id"]] = WorkflowStateItem(i, copy.deepcopy(joborder[iid]))
elif "default" in i:
self.state[i["id"]] = WorkflowStateItem(i, copy.deepcopy(i["default"]))
else:
raise WorkflowException("Input '%s' not in input object and does not have a default value." % (i["id"]))
for s in steps:
for out in s.tool["outputs"]:
self.state[out["id"]] = None
s.completed = False
completed = 0
while completed < len(steps):
made_progress = False
completed = 0
for step in steps:
if step.completed:
completed += 1
else:
for newjob in self.try_make_job(step, basedir, requirements=requirements, hints=hints, **kwargs):
if newjob:
made_progress = True
yield newjob
if not made_progress and completed < len(steps):
yield None
wo = {}
for i in self.tool["outputs"]:
if "connect" in i:
(_, src) = urlparse.urldefrag(i['id'])
if i["connect"]["source"] not in self.state:
raise WorkflowException("Connect source '%s' on parameter '%s' does not exist" % (i["connect"]["source"], inp["id"]))
wo[src] = self.state[i["connect"]["source"]].value
output_callback(wo, self.processStatus)
开发者ID:porterjamesj,项目名称:common-workflow-language,代码行数:50,代码来源:workflow.py
示例11: crawl_web
def crawl_web( scope, tocrawl, index, graph, url_info, limits = [-1, 0, 0.0, 1.0]): # returns index, graph of inlinks
tocrawl_next = [] # used for depth control
depth = 0
pages = 0
max_pages, max_depth, max_time, time_delay = limits
if max_time > 0.0: start_time = time()
while tocrawl or tocrawl_next:
if not tocrawl:
#
# Descent one more level (depth)
#
tocrawl = tocrawl_next
tocrawl_next = []
depth += 1
if max_depth >= 0 and depth > max_depth:
print 'Reached maximum depth. Interrupting crawler.'
break
page = tocrawl.pop(0)
# Remove fragment portion from the url
page = urlparse.urldefrag(page)[0]
if not page in graph:
pages += 1
print 'Crawling page:', page
if max_time != 0.0: print 'time = ', time()-start_time, ' max_time = ', max_time
if max_pages > 0:
print 'Pages crawled:', pages, 'max_pages = ', max_pages
# [ToDo:]Transform meta_data into a dictionary
text, outlinks, meta_data = get_page( page)
add_page_to_index( index, page, text)
# Need to filter outlinks only to current scope
outlinks = [ [urlparse.urldefrag(l[0])[0],l[1]] for l in outlinks if is_inscope( scope, l[0]) and (l[0].endswith('.html') or l[0].endswith('.htm')) ]
newlinks = [ urlparse.urldefrag(l[0])[0] for l in outlinks]
graph[page] = outlinks
url_info[page] = meta_data
tocrawl_next = list( set(tocrawl_next + newlinks))
if pages >= max_pages:
print 'Reached number of pages limit. Interrupting crawler.'
break
if max_time > 0.0 and max_time > time()-start_time:
print 'Reached time limit. Interrupting crawler.'
break
tocrawl = list( set(tocrawl + tocrawl_next))
return tocrawl, index, graph, url_info
开发者ID:edevaldo,项目名称:cs101-lupa,代码行数:48,代码来源:crawl.py
示例12: __init__
def __init__(self, request, timeout=180):
self.url = urldefrag(request.url)[0]
self.method = request.method
self.body = request.body or None
self.headers = Headers(request.headers)
self.response_headers = None
self.timeout = request.meta.get('download_timeout') or timeout
self.start_time = time()
self.deferred = defer.Deferred().addCallback(self._build_response, request)
# Fixes Twisted 11.1.0+ support as HTTPClientFactory is expected
# to have _disconnectedDeferred. See Twisted r32329.
# As Scrapy implements it's own logic to handle redirects is not
# needed to add the callback _waitForDisconnect.
# Specifically this avoids the AttributeError exception when
# clientConnectionFailed method is called.
self._disconnectedDeferred = defer.Deferred()
self._set_connection_attributes(request)
# set Host header based on url
self.headers.setdefault('Host', self.netloc)
# set Content-Length based len of body
if self.body is not None:
self.headers['Content-Length'] = len(self.body)
# just in case a broken http/1.1 decides to keep connection alive
self.headers.setdefault("Connection", "close")
开发者ID:535521469,项目名称:crawler_sth,代码行数:28,代码来源:webclient.py
示例13: grab_links
def grab_links(self):
if self.document is not None:
for item in self.document.xpath('//a/@href'):
item = urldefrag(item)[0]
url = urlparse(item)
if url.geturl() and item not in self.crawler.visited_urls and url.hostname in self.processor.allowed_urls:
self.crawler.urls.put(item)
开发者ID:stanfeldman,项目名称:crawler.py,代码行数:7,代码来源:spider.py
示例14: write_opf
def write_opf(self, guide, toc, spine, resource_map):
mi = self.header.exth.mi
if (self.cover_offset is not None and self.cover_offset <
len(resource_map)):
mi.cover = resource_map[self.cover_offset]
if len(list(toc)) < 2:
self.log.warn('KF8 has no metadata Table of Contents')
for ref in guide:
if ref.type == 'toc':
href = ref.href()
href, frag = urldefrag(href)
if os.path.exists(href.replace('/', os.sep)):
try:
toc = self.read_inline_toc(href, frag)
except:
self.log.exception('Failed to read inline ToC')
opf = OPFCreator(os.getcwdu(), mi)
opf.guide = guide
def exclude(path):
return os.path.basename(path) == 'debug-raw.html'
opf.create_manifest_from_files_in([os.getcwdu()], exclude=exclude)
opf.create_spine(spine)
opf.set_toc(toc)
with open('metadata.opf', 'wb') as of, open('toc.ncx', 'wb') as ncx:
opf.render(of, ncx, 'toc.ncx')
return 'metadata.opf'
开发者ID:Eksmo,项目名称:calibre,代码行数:32,代码来源:mobi8.py
示例15: serialize_href
def serialize_href(self, href, base=None):
'''
Serialize the href attribute of an <a> or <reference> tag. It is
serialized as filepos="000000000" and a pointer to its location is
stored in self.href_offsets so that the correct value can be filled in
at the end.
'''
hrefs = self.oeb.manifest.hrefs
try:
path, frag = urldefrag(urlnormalize(href))
except ValueError:
# Unparseable URL
return False
if path and base:
path = base.abshref(path)
if path and path not in hrefs:
return False
buf = self.buf
item = hrefs[path] if path else None
if item and item.spine_position is None:
return False
path = item.href if item else base.href
href = '#'.join((path, frag)) if frag else path
buf.write(b'filepos=')
self.href_offsets[href].append(buf.tell())
buf.write(b'0000000000')
return True
开发者ID:Eksmo,项目名称:calibre,代码行数:27,代码来源:serializer.py
示例16: serialize_guide
def serialize_guide(self):
'''
The Kindle decides where to open a book based on the presence of
an item in the guide that looks like
<reference type="text" title="Start" href="chapter-one.xhtml"/>
Similarly an item with type="toc" controls where the Goto Table of
Contents operation on the kindle goes.
'''
buf = self.buf
hrefs = self.oeb.manifest.hrefs
buf.write(b'<guide>')
for ref in self.oeb.guide.values():
path = urldefrag(ref.href)[0]
if path not in hrefs or hrefs[path].media_type not in OEB_DOCS:
continue
buf.write(b'<reference type="')
if ref.type.startswith('other.') :
self.serialize_text(ref.type.replace('other.',''), quot=True)
else:
self.serialize_text(ref.type, quot=True)
buf.write(b'" ')
if ref.title is not None:
buf.write(b'title="')
self.serialize_text(ref.title, quot=True)
buf.write(b'" ')
if is_guide_ref_start(ref):
self._start_href = ref.href
self.serialize_href(ref.href)
# Space required or won't work, I kid you not
buf.write(b' />')
buf.write(b'</guide>')
开发者ID:Eksmo,项目名称:calibre,代码行数:35,代码来源:serializer.py
示例17: find_mention_item
def find_mention_item(self, items):
"""Returns the mf2 item that mentions (or replies to, likes, etc) the target.
May modify the items arg, e.g. may set or replace content.html or
content.value.
Args:
items: sequence of mf2 item dicts
Returns:
mf2 item dict or None
"""
# find target URL in source
for item in items:
props = item.setdefault('properties', {})
# find first non-empty content element
content = props.setdefault('content', [{}])[0]
text = content.get('html') or content.get('value')
for type in 'in-reply-to', 'like', 'like-of', 'repost', 'repost-of':
urls = [urlparse.urldefrag(u)[0] for u in
microformats2.get_string_urls(props.get(type, []))]
if self.any_target_in(urls):
break
else:
if text and self.any_target_in(text):
type = 'post'
url = first_value(props, 'url') or self.source_url
name = first_value(props, 'name') or first_value(props, 'summary')
text = content['html'] = ('mentioned this in %s.' %
util.pretty_link(url, text=name, max_length=280))
else:
type = None
if type:
# found the target!
rsvp = first_value(props, 'rsvp')
if rsvp:
self.entity.type = 'rsvp'
if not text:
content['value'] = 'RSVPed %s.' % rsvp
else:
self.entity.type = {'in-reply-to': 'comment',
'like-of': 'like',
'repost-of': 'repost',
}.get(type, type)
if not text:
content['value'] = {'comment': 'replied to this.',
'like': 'liked this.',
'repost': 'reposted this.',
}[self.entity.type]
return item
# check children in case this is eg an h-feed
found = self.find_mention_item(item.get('children', []))
if found:
return found
return None
开发者ID:snarfed,项目名称:bridgy,代码行数:60,代码来源:blog_webmention.py
示例18: _spine_add_extra
def _spine_add_extra(self):
manifest = self.oeb.manifest
spine = self.oeb.spine
unchecked = set(spine)
selector = XPath('h:body//h:a/@href')
extras = set()
while unchecked:
new = set()
for item in unchecked:
if item.media_type not in OEB_DOCS:
# TODO: handle fallback chains
continue
for href in selector(item.data):
href, _ = urldefrag(href)
if not href:
continue
try:
href = item.abshref(urlnormalize(href))
except ValueError: # Malformed URL
continue
if href not in manifest.hrefs:
continue
found = manifest.hrefs[href]
if found.media_type not in OEB_DOCS or \
found in spine or found in extras:
continue
new.add(found)
extras.update(new)
unchecked = new
version = int(self.oeb.version[0])
for item in sorted(extras):
if version >= 2:
self.logger.warn(
'Spine-referenced file %r not in spine' % item.href)
spine.add(item, linear=False)
开发者ID:mihailim,项目名称:calibre,代码行数:35,代码来源:reader.py
示例19: __init__
def __init__(self, toolpath_object, **kwargs):
try:
makeTool = kwargs.get("makeTool")
self.embedded_tool = makeTool(toolpath_object["run"], **kwargs)
except validate.ValidationException as v:
raise WorkflowException("Tool definition %s failed validation:\n%s" % (toolpath_object["run"]["id"], validate.indent(str(v))))
if "id" in toolpath_object:
self.id = toolpath_object["id"]
else:
self.id = "#step_" + str(random.randint(1, 1000000000))
for field in ("inputs", "outputs"):
for i in toolpath_object[field]:
inputid = i["id"]
(_, d) = urlparse.urldefrag(inputid)
frag = d.split(".")[-1]
p = urlparse.urljoin(toolpath_object["run"].get("id", self.id), "#" + frag)
found = False
for a in self.embedded_tool.tool[field]:
if a["id"] == p:
i.update(a)
found = True
if not found:
raise WorkflowException("Did not find %s parameter '%s' in workflow step" % (field, p))
i["id"] = inputid
super(WorkflowStep, self).__init__(toolpath_object, "Process", do_validate=False, **kwargs)
if self.embedded_tool.tool["class"] == "Workflow":
(feature, _) = self.get_requirement("SubworkflowFeatureRequirement")
if not feature:
raise WorkflowException("Workflow contains embedded workflow but SubworkflowFeatureRequirement not declared")
开发者ID:robinandeer,项目名称:common-workflow-language,代码行数:33,代码来源:workflow.py
示例20: receive_output
def receive_output(self, jobout, processStatus):
_logger.debug("WorkflowStep output from run is %s", jobout)
self.output = {}
for i in self.tool["outputs"]:
(_, d) = urlparse.urldefrag(i["param"] if "param" in i else i["id"])
self.output[i["id"]] = jobout[d]
self.processStatus = processStatus
开发者ID:porterjamesj,项目名称:common-workflow-language,代码行数:7,代码来源:workflow.py
注:本文中的urlparse.urldefrag函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论