本文整理汇总了Python中urlgrabber.urlread函数的典型用法代码示例。如果您正苦于以下问题:Python urlread函数的具体用法?Python urlread怎么用?Python urlread使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urlread函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_fedora_releases
def get_fedora_releases():
releases = []
html = urlread(FEDORA_RELEASES)
for release in re.findall(r'<a href="(\d+)/">', html)[-2:][::-1]:
for arch in ARCHES:
arch_url = FEDORA_RELEASES + '%s/Live/%s/' % (release, arch)
files = urlread(arch_url)
for link in re.findall(r'<a href="(.*)">', files):
if link.endswith('-CHECKSUM'):
checksum = urlread(arch_url + link)
for line in checksum.split('\n'):
try:
sha256, filename = line.split()
if filename[0] != '*':
continue
filename = filename[1:]
chunks = filename[:-4].split('-')
chunks.remove('Live')
name = ' '.join(chunks)
releases.append(dict(
name = name,
url = arch_url + filename,
sha256 = sha256,
))
except ValueError:
pass
return releases
开发者ID:onpaws,项目名称:liveusb-creator,代码行数:27,代码来源:get_releases.py
示例2: _weather_get
def _weather_get(self, city, raw=False):
url = 'http://api.openweathermap.org/data/2.5/weather?{}={}'
if all(c in '0123456789' for c in city):
try:
resp = urlgrabber.urlread(url.format('zip', city), size=2097152*10)
except urlgrabber.grabber.URLGrabError:
resp = 'Failed to fetch weather for {}'.format(repr(city))
else:
try:
resp = urlgrabber.urlread(url.format('q', self._weather_parse_city(city)), size=2097152*10)
except urlgrabber.grabber.URLGrabError:
resp = 'Failed to fetch weather for {}'.format(repr(city))
if raw:
return resp
try:
json_data = json.loads(resp)
return 'Current weather for {city}: {desc}, low:{low:.1f} high:{cur:.1f} currently:{high:.1f}'.format(
city=json_data['name'],
desc=json_data['weather'][0]['description'],
low=self._weather_convert(json_data['main']['temp_min']),
cur=self._weather_convert(json_data['main']['temp']),
high=self._weather_convert(json_data['main']['temp_max']),
)
except (KeyError, ValueError):
return 'API error for {}: {}'.format(repr(city), resp)
开发者ID:zonk1024,项目名称:zonkb0t,代码行数:25,代码来源:botcommand.py
示例3: getfips
def getfips(self):
#skip header row, go through rest of data
#This block of code attaches FIPS to addresses
for rownum in range(self.sh.nrows)[1:self.sh.nrows]:
address=self.sh.row_values(rownum)[self.addressfield]+","
# Hard coding in Massachusetts!
city=self.sh.row_values(rownum)[self.cityfield]+", Ma"
zipcode=self.sh.row_values(rownum)[self.zipfield]
buildurl='http://rpc.geocoder.us/service/csv?address='+address+'+'+city+'+'+zipcode
# get rid of ridiculous unicode nonbreaking spaces and all spaces
buildurl=buildurl.replace(u'\xa0', u'').replace(' ','+')
# switch type to string
burlstr=buildurl.encode('ascii','ignore')
print burlstr
outp=urlgrabber.urlread(burlstr)
# If address not resolved, skip it, assign 999999 tract code:
if outp != "2: couldn't find this address! sorry":
lat = outp.split(",")[0]
lon = outp.split(",")[1]
buildcensurl = 'http://data.fcc.gov/api/block/2010/find?latitude='+lat+'&longitude='+lon
outblock = urlgrabber.urlread(buildcensurl)
e = ET.fromstring(outblock)
block = e.find('{http://data.fcc.gov/api}Block')
fipstract = block.attrib['FIPS'][0:11]
else:
fipstract='99999999999'
self.fipslist.append(fipstract)
开发者ID:FlavioFalcao,项目名称:community-action-maps,代码行数:29,代码来源:censusmunger.py
示例4: GET
def GET(self):
web.header("Pragma", "no-cache")
web.header("Cache-Control", "no-cache")
self.restrict_access()
input = web.input(href='')
if input.href:
if debug:
web.debug('opening ' + input.href)
if input.href.find('url=file') < 0:
webbrowser.open_new_tab(input.href)
else:
urlgrabber.urlread(input.href)
开发者ID:chuang1990,项目名称:context-agent,代码行数:13,代码来源:webserver.py
示例5: get_fedora_releases
def get_fedora_releases():
global releases
fedora_releases = []
try:
html = urlread(PUB_URL)
versions = re.findall(r'<a href="(\d+)/">', html)
latest = sorted([int(v) for v in versions], reverse=True)[0:2]
for release in latest:
if release >= 21:
products = ('Workstation', 'Server', 'Cloud', 'Live', 'Spins')
else:
products = ('Live', 'Spins')
for product in products:
for arch in ARCHES:
baseurl = PUB_URL
if product == 'Live':
isodir = '/'
elif product == 'Spins':
baseurl = ALT_URL
isodir = '/'
else:
isodir = '/iso/'
arch_url = baseurl + '%s/%s/%s%s' % (release,
product, arch, isodir)
print(arch_url)
try:
files = urlread(arch_url)
except URLGrabError:
continue
for link in re.findall(r'<a href="(.*)">', files):
if link.endswith('-CHECKSUM'):
print('Reading %s' % arch_url + link)
checksum = urlread(arch_url + link)
for line in checksum.split('\n'):
try:
sha256, filename = line.split()
if filename[0] != '*':
continue
filename = filename[1:]
name = filename.replace('.iso', '')
fedora_releases.append(dict(
name=name,
url=arch_url + filename,
sha256=sha256,
))
except ValueError:
pass
releases = fedora_releases
except:
traceback.print_exc()
return releases
开发者ID:enlighter,项目名称:linux-customizations,代码行数:51,代码来源:releases.py
示例6: create_profile
def create_profile(self, distro_name):
"""
Create a test profile with random name associated with the given distro.
Returns a tuple of profile ID and name.
"""
profile_name = "%s%s" % (TEST_PROFILE_PREFIX,
random.randint(1, 1000000))
profile_id = self.api.new_profile(self.token)
self.api.modify_profile(profile_id, "name", profile_name, self.token)
self.api.modify_profile(profile_id, "distro", distro_name, self.token)
self.api.modify_profile(profile_id, "kickstart",
FAKE_KICKSTART, self.token)
self.api.modify_profile(profile_id, "kopts",
{ "dog" : "fido", "cat" : "fluffy" }, self.token)
self.api.modify_profile(profile_id, "kopts-post",
{ "phil" : "collins", "steve" : "hackett" }, self.token)
self.api.modify_profile(profile_id, "ksmeta", "good=sg1 evil=gould",
self.token)
self.api.modify_profile(profile_id, "breed", "redhat", self.token)
self.api.modify_profile(profile_id, "owners", "sam dave", self.token)
self.api.modify_profile(profile_id, "mgmt-classes", "blip", self.token)
self.api.modify_profile(profile_id, "comment", "test profile",
self.token)
self.api.modify_profile(profile_id, "redhat_management_key",
"1-ABC123", self.token)
self.api.modify_profile(profile_id, "redhat_management_server",
"mysatellite.example.com", self.token)
self.api.modify_profile(profile_id, "virt_bridge", "virbr0",
self.token)
self.api.modify_profile(profile_id, "virt_cpus", "2", self.token)
self.api.modify_profile(profile_id, "virt_file_size", "3", self.token)
self.api.modify_profile(profile_id, "virt_path", "/opt/qemu/%s" %
profile_name, self.token)
self.api.modify_profile(profile_id, "virt_ram", "1024", self.token)
self.api.modify_profile(profile_id, "virt_type", "qemu", self.token)
self.api.save_profile(profile_id, self.token)
self.cleanup_profiles.append(profile_name)
# Check cobbler services URLs:
url = "http://%s/cblr/svc/op/ks/profile/%s" % (cfg['cobbler_server'],
profile_name)
data = urlgrabber.urlread(url)
self.assertEquals(FAKE_KS_CONTENTS, data)
url = "http://%s/cblr/svc/op/list/what/profiles" % cfg['cobbler_server']
data = urlgrabber.urlread(url)
self.assertNotEquals(-1, data.find(profile_name))
return (profile_id, profile_name)
开发者ID:77720616,项目名称:cobbler,代码行数:50,代码来源:base.py
示例7: daModule
def daModule(line):
url = buildUrl(line)
try:
st = urlread(url)
except:
return '[' +line+ '] yok böyle bi şii'
return parsit(st)
开发者ID:GUIpsp,项目名称:neuro-bot,代码行数:7,代码来源:mod_eksi.py
示例8: srpm_from_ticket
def srpm_from_ticket(self):
'''Retrieve the latest srpmURL from the buzilla URL.
'''
try:
bugzillaURL = self.checklist.properties['ticketURL'].value
except KeyError:
# No ticket URL was given, set nothing
return
if not bugzillaURL:
# No ticket URL was given, set nothing
return
data = urlgrabber.urlread(bugzillaURL)
srpmList = re.compile('"((ht|f)tp(s)?://.*?\.src\.rpm)"', re.IGNORECASE).findall(data)
if srpmList == []:
# No SRPM was found. Just decide not to set anything.
return
# Set the srpm to the last SRPM listed on the page
srpmURL = srpmList[-1][0]
if not srpmURL:
# No srpm found. Just decide not to set anything.
return
# Download the srpm to the temporary directory.
urlgrabber.urlgrab(srpmURL, self.tmpDir)
# Fill the SRPMfile properties with the srpm in the temp directory
self.checklist.properties['SRPMfile'].value = (
self.tmpDir + os.path.basename(srpmURL))
开发者ID:BackupTheBerlios,项目名称:qa-assistant-svn,代码行数:28,代码来源:fedoraus.py
示例9: __init__
def __init__(self):
data = StringIO.StringIO(urlgrabber.urlread("http://itunes.com/version"))
stream = gzip.GzipFile(fileobj=data)
data = stream.read()
updates = plistlib.readPlistFromString(data)
devs = self.findPods()
for (dev, name, family, firmware) in devs:
if not family:
family, firmware = self.getIPodData(dev)
print "Found %s with family %s and firmware %s" % (name, family, firmware)
if updates["iPodSoftwareVersions"].has_key(unicode(family)):
uri = updates["iPodSoftwareVersions"][unicode(family)]["FirmwareURL"]
print "Latest firmware: %s" % uri
print "Fetching firmware..."
path = urlgrabber.urlgrab(
uri, progress_obj=urlgrabber.progress.text_progress_meter(), reget="check_timestamp"
)
print "Extracting firmware..."
zf = zipfile.ZipFile(path)
for name in zf.namelist():
if name[:8] == "Firmware":
print "Firmware found."
outfile = open("Firmware", "wb")
outfile.write(zf.read(name))
outfile.close()
infile = open("Firmware", "rb")
outfile = open(dev, "wb")
# FIXME: do the following in pure python?
print "Making backup..."
commands.getoutput("dd if=%s of=Backup" % dev)
print "Uploading firmware..."
commands.getoutput("dd if=Firmware of=%s" % dev)
print "Done."
开发者ID:cberetta,项目名称:ipod-update,代码行数:33,代码来源:ipod-update.py
示例10: create_distro
def create_distro(self):
"""
Create a test distro with a random name, store it for cleanup
during teardown.
Returns a tuple of the objects ID and name.
"""
distro_name = "%s%s" % (TEST_DISTRO_PREFIX, random.randint(1, 1000000))
did = self.api.new_distro(self.token)
self.api.modify_distro(did, "name", distro_name, self.token)
self.api.modify_distro(did, "kernel", FAKE_KERNEL, self.token)
self.api.modify_distro(did, "initrd", FAKE_INITRD, self.token)
self.api.modify_distro(did, "kopts",
{ "dog" : "fido", "cat" : "fluffy" }, self.token)
self.api.modify_distro(did, "ksmeta", "good=sg1 evil=gould", self.token)
self.api.modify_distro(did, "breed", "redhat", self.token)
self.api.modify_distro(did, "os-version", "rhel5", self.token)
self.api.modify_distro(did, "owners", "sam dave", self.token)
self.api.modify_distro(did, "mgmt-classes", "blip", self.token)
self.api.modify_distro(did, "comment", "test distro", self.token)
self.api.modify_distro(did, "redhat_management_key",
"1-ABC123", self.token)
self.api.modify_distro(did, "redhat_management_server",
"mysatellite.example.com", self.token)
self.api.save_distro(did, self.token)
self.cleanup_distros.append(distro_name)
url = "http://%s/cblr/svc/op/list/what/distros" % cfg['cobbler_server']
data = urlgrabber.urlread(url)
self.assertNotEquals(-1, data.find(distro_name))
return (did, distro_name)
开发者ID:77720616,项目名称:cobbler,代码行数:34,代码来源:base.py
示例11: parse_aur
def parse_aur(self):
'''
Reads the aur file and creates initial tasks for the defined packages.
'''
self.gen_repo_data()
notify = False
aur = 'http://aur.archlinux.org/rpc.php?type=info&arg='
for line in open(self.opts['aur_file'], 'r').readlines():
line = line.strip()
if line.startswith('#'):
continue
data = eval(urlgrabber.urlread(aur + line))
if data['type'] == 'error':
# log something
continue
if self.aur.has_key(line):
ver = data['results']['Version']
if aur[line] < ver:
notify = True
else:
notify = True
if notify:
notp = {'type': 'aur_pkg',
'action': 'build_aur_pkg',
'name': line}
notn = str(int(time.time()))\
+ str(random.randint(1000,9999))
path = os.path.join(self.opts['not_dir'], notn + 'p')
pickle.dump(notp, open(path, 'w'))
开发者ID:ChristianSP,项目名称:sandbox,代码行数:29,代码来源:tasks.py
示例12: _retrievePublicKey
def _retrievePublicKey(self, keyurl, repo=None):
"""
Retrieve a key file
@param keyurl: url to the key to retrieve
Returns a list of dicts with all the keyinfo
"""
key_installed = False
# Go get the GPG key from the given URL
try:
url = yum.misc.to_utf8(keyurl)
if repo is None:
rawkey = urlgrabber.urlread(url, limit=9999)
else:
# If we have a repo. use the proxy etc. configuration for it.
# In theory we have a global proxy config. too, but meh...
# external callers should just update.
ug = URLGrabber(bandwidth = repo.bandwidth,
retry = repo.retries,
throttle = repo.throttle,
progress_obj = repo.callback,
proxies=repo.proxy_dict)
ug.opts.user_agent = default_grabber.opts.user_agent
rawkey = ug.urlread(url, text=repo.id + "/gpgkey")
except urlgrabber.grabber.URLGrabError, e:
raise ChannelException('GPG key retrieval failed: ' +
yum.i18n.to_unicode(str(e)))
开发者ID:m47ik,项目名称:uyuni,代码行数:28,代码来源:yum_src.py
示例13: get_html_url
def get_html_url(vals, url):
if vals['debug'] > 1: print("processing %s" % (url));
# download url
try:
html_code = urlgrabber.urlread(url);
except urlgrabber.grabber.URLGrabError:
# 404 error
error_str = "URL down: %s" % (url);
return (-1, error_str);
return (0, html_code);
开发者ID:chemag,项目名称:libmhtml.py,代码行数:10,代码来源:libmhtml.py
示例14: _url
def _url(self, args):
"""Usage: `{cmd_prefix}url *urls`"""
if not args:
return None
output = []
for url in args:
if not any(url.startswith(i) for i in ('https://', 'http://')):
url = 'http://{}'.format(url)
bs = BeautifulSoup.BeautifulSoup(urlgrabber.urlread(url, size=2097152*10))
output.append(bs.title.string)
return '\n'.join(output)
开发者ID:zonk1024,项目名称:zonkb0t,代码行数:11,代码来源:botcommand.py
示例15: compare_sha256
def compare_sha256(d, filename, graburl):
""" looks for a FileDetails object that matches the given URL """
found = False
s = urlgrabber.urlread(graburl)
sha256 = hashlib.sha256(s).hexdigest()
for fd in list(d.fileDetails):
if fd.filename == filename and fd.sha256 is not None:
if fd.sha256 == sha256:
found = True
break
return found
开发者ID:yumsuresht,项目名称:mirrormanager2,代码行数:11,代码来源:crawler_perhost.py
示例16: _reddit
def _reddit(self, args):
"""Usage: `{cmd_prefix}reddit [*subreddits]`"""
output = []
args = args if args else ['']
for arg in args:
if arg:
site = 'http://www.reddit.com/r/{}'.format(arg)
logger.log((site, ), (None, ))
else:
site = 'http://www.reddit.com/'
bs = BeautifulSoup.BeautifulSOAP(urlgrabber.urlread(site, size=2097152*10))
output.extend(bs.findAll('a', 'title'))
return '\n'.join('{}: {} {}'.format(i + 1, o.string, o.get('href')) for i, o in enumerate(output[:5]))
开发者ID:zonk1024,项目名称:zonkb0t,代码行数:13,代码来源:botcommand.py
示例17: get_fedora_releases
def get_fedora_releases():
global releases
fedora_releases = []
try:
html = urlread(FEDORA_RELEASES)
for release in re.findall(r'<a href="(\d+)/">', html)[-2:][::-1]:
for arch in ARCHES:
arch_url = FEDORA_RELEASES + '%s/Live/%s/' % (release, arch)
try:
files = urlread(arch_url)
except URLGrabError:
continue
for link in re.findall(r'<a href="(.*)">', files):
if link.endswith('-CHECKSUM'):
checksum = urlread(arch_url + link)
for line in checksum.split('\n'):
try:
sha256, filename = line.split()
if filename[0] != '*':
continue
filename = filename[1:]
chunks = filename[:-6].split('-')
chunks.remove('Live')
release = chunks.pop()
chunks.insert(1,release)
name = ' '.join(chunks)
fedora_releases.append(dict(
name=name,
url=arch_url + filename,
sha256=sha256,
))
except ValueError:
pass
releases = fedora_releases + other_releases
except:
# Can't fetch releases from the internet.
releases += other_releases
return releases
开发者ID:iceheartwx,项目名称:liveusb-creator,代码行数:38,代码来源:releases.py
示例18: sparqlQuery
def sparqlQuery(query, baseURL, format="application/json", logger=None):
params={
"default-graph": "",
"query": query,
"debug": "on",
"timeout": "",
"format": format,
"save": "display",
"fname": ""
}
querypart=urllib.urlencode(params)
#response = urllib.urlopen(baseURL,querypart).read()
#return json.loads(response)
response = urlgrabber.urlread(baseURL + "?" + querypart)
return json.loads(response)
开发者ID:tombech,项目名称:misc,代码行数:17,代码来源:sparqlutils.py
示例19: verify_cert
def verify_cert():
"""
Check that the user cert is valid.
things to check/return
not revoked
Expiry time warn if less than 21 days
"""
my_cert = _open_cert()
serial_no = my_cert.get_serial_number()
valid_until = my_cert.get_notAfter()[:8]
crl = urlgrabber.urlread("https://admin.fedoraproject.org/ca/crl.pem")
dateFmt = '%Y%m%d'
delta = datetime.datetime.now() + datetime.timedelta(days=21)
warn = datetime.datetime.strftime(delta, dateFmt)
print 'cert expires: %s-%s-%s' % (valid_until[:4], valid_until[4:6], valid_until[6:8])
if valid_until < warn:
print 'WARNING: Your cert expires soon.'
开发者ID:glensc,项目名称:fedora-packager,代码行数:19,代码来源:__init__.py
示例20: find_autoinstall
def find_autoinstall(self, system=None, profile=None, **rest):
self.__xmlrpc_setup()
serverseg = "http//%s" % self.collection_mgr._settings.server
name = "?"
if system is not None:
url = "%s/cblr/svc/op/autoinstall/system/%s" % (serverseg, name)
elif profile is not None:
url = "%s/cblr/svc/op/autoinstall/profile/%s" % (serverseg, name)
else:
name = self.autodetect(**rest)
if name.startswith("FAILED"):
return "# autodetection %s" % name
url = "%s/cblr/svc/op/autoinstall/system/%s" % (serverseg, name)
try:
return urlgrabber.urlread(url)
except:
return "# automatic installation file retrieval failed (%s)" % url
开发者ID:ASyriy,项目名称:cobbler,代码行数:20,代码来源:services.py
注:本文中的urlgrabber.urlread函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论