本文整理汇总了Python中urllib.URLopener类的典型用法代码示例。如果您正苦于以下问题:Python URLopener类的具体用法?Python URLopener怎么用?Python URLopener使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了URLopener类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: download_package
def download_package(pkg_name, pkg_version):
'''Download the required package. Sometimes the download can be flaky, so we use the
retry decorator.'''
pkg_type = 'sdist' # Don't download wheel archives for now
# This JSON endpoint is not provided by PyPI mirrors so we always need to get this
# from pypi.python.org.
pkg_info = json.loads(urlopen('https://pypi.python.org/pypi/%s/json' % pkg_name).read())
downloader = URLopener()
for pkg in pkg_info['releases'][pkg_version]:
if pkg['packagetype'] == pkg_type:
filename = pkg['filename']
expected_md5 = pkg['md5_digest']
if os.path.isfile(filename) and check_md5sum(filename, expected_md5):
print "File with matching md5sum already exists, skipping %s" % filename
return True
pkg_url = "{0}/packages/{1}".format(PYPI_MIRROR, pkg['path'])
print "Downloading %s from %s" % (filename, pkg_url)
downloader.retrieve(pkg_url, filename)
actual_md5 = md5(open(filename).read()).hexdigest()
if check_md5sum(filename, expected_md5):
return True
else:
print "MD5 mismatch in file %s." % filename
return False
print "Could not find archive to download for %s %s %s" % (
pkg_name, pkg_version, pkg_type)
sys.exit(1)
开发者ID:michaelhkw,项目名称:Impala,代码行数:28,代码来源:pip_download.py
示例2: handle_starttag
def handle_starttag(self, tag, attrs):
#tmpoutput = ""
count = 0
global bDoWork
#self.output = ""
# Only parse the 'anchor' tag.
if tag == "a":
# Check the list of defined attributes.
for name, value in attrs:
# If href is defined, print it.
if name == "href":
if value[len(value) - 3:len(value)] == "jpg":
#print value
if not "http://" in value and bDoWork == True:
bDoWork = False
tmpoutput = value
#print "Val: " + value
imgurl = 'http://apod.nasa.gov/apod/' + tmpoutput
#print "IMGURL: " + imgurl
filename = imgurl.split('/')[-1]
#print "FileName: " + filename
if (not os.path.isfile(filename)) and ('apod.nasa.gov' in imgurl):
#print "Downloading: " + filename
image = URLopener()
image.retrieve(imgurl,filename)
sleep(lWaitTime)
elif (os.path.isfile(filename)):
print "Verified: " + filename
break
开发者ID:DaveZahn,项目名称:APOD_Downloader,代码行数:30,代码来源:APOD_Downloader_Raspbian.py
示例3: check_the_mangas
class check_the_mangas():
def __init__(self,manga_name):
self.manga_name = manga_name
self.myfile = open(configuration.DATA_FILE,'r').read()
self.manga_oldnumber = self.get_number()
self.manga_nownumber = self.manga_oldnumber
self.manga_olddate = self.get_date ()
self.nowdate = self.today_date()
self.br = URLopener()
def get_number(self):
return re.findall(self.manga_name+':([0-9]+):',self.myfile)[0]
def get_date(self):
return re.findall(self.manga_name+":"+str(self.manga_oldnumber)+':(.*)\n',self.myfile)[0]
def today_date(self):
return subprocess.check_output(["date","+%a-%b-%e"]).replace("\n","")
#return 1 if the connection is working
def test_connection(self):
try:
response = self.br.open(configuration.WEBSITE_TO_CHECK_CONNECTION).read()
if configuration.KEYWORD in response:
return 1
else:
return 0
except:
print "manga connection"
return 0
def exec_cmd(self):
pid = os.fork()
os.umask(0)
os.system(configuration.MANGA_NEW_CMD.replace("MANGA",self.manga_name))
def run(self):
if( self.test_connection() ):
last_chapter = False
try:
while(last_chapter==False):
to_open = "http://www.mangareader.net/" + self.manga_name + "/" + str( int(self.manga_nownumber)+1 )
response = self.br.open( to_open).read()
if "is not released yet" in response or "not published yet" in response or response == "":
last_chapter = True
if self.manga_name + ":" + str(self.manga_nownumber) not in open(configuration.DATA_FILE, "r").read():
Thread(target=self.exec_cmd).start()
configuration.backup()
open(configuration.DATA_FILE,'w').write(open(configuration.DATA_FILE+".bak", "r").read().replace(self.manga_name+":"+str(self.manga_oldnumber)+":"+ self.manga_olddate, self.manga_name+":"+str(self.manga_nownumber)+":"+self.nowdate))
else:
print "not last chapter"
self.manga_nownumber = str( int(self.manga_nownumber)+1 )
except Exception,e :
print e
print "manga run"
if "is not released yet. If you liked" in response:
if self.manga_name + ":" + str(self.manga_nownumber) not in open(configuration.DATA_FILE, "r").read():
configuration.backup()
open(configuration.DATA_FILE,'w').write(open(configuration.DATA_FILE+".bak", "r").read().replace(self.manga_name+":"+str(self.manga_oldnumber)+":"+ self.manga_olddate, self.manga_name+":"+str(self.manga_nownumber)+":"+self.nowdate))
pass
开发者ID:venam,项目名称:updater,代码行数:60,代码来源:mangas.py
示例4: command
def command(self):
args = list(self.args)
method, url = args[0:2]
if not url.startswith('http'):
url = 'http://%s:%s%s' % (self.session.config.sys.http_host,
self.session.config.sys.http_port,
('/' + url).replace('//', '/'))
# FIXME: The python URLopener doesn't seem to support other verbs,
# which is really quite lame.
method = method.upper()
assert(method in ('GET', 'POST'))
qv, pv = [], []
if method == 'POST':
which = pv
else:
which = qv
for arg in args[2:]:
if '=' in arg:
which.append(tuple(arg.split('=', 1)))
elif arg.upper()[0] == 'P':
which = pv
elif arg.upper()[0] == 'Q':
which = qv
if qv:
qv = urlencode(qv)
url += ('?' in url and '&' or '?') + qv
# Log us in automagically!
httpd = self.session.config.http_worker.httpd
global HACKS_SESSION_ID
if HACKS_SESSION_ID is None:
HACKS_SESSION_ID = httpd.make_session_id(None)
mailpile.auth.SetLoggedIn(None,
user='Hacks plugin HTTP client',
session_id=HACKS_SESSION_ID)
cookie = httpd.session_cookie
try:
uo = URLopener()
uo.addheader('Cookie', '%s=%s' % (cookie, HACKS_SESSION_ID))
with TcpConnBroker().context(need=[TcpConnBroker.OUTGOING_HTTP]):
if method == 'POST':
(fn, hdrs) = uo.retrieve(url, data=urlencode(pv))
else:
(fn, hdrs) = uo.retrieve(url)
hdrs = unicode(hdrs)
data = open(fn, 'rb').read().strip()
if data.startswith('{') and 'application/json' in hdrs:
data = json.loads(data)
return self._success('%s %s' % (method, url), result={
'headers': hdrs.splitlines(),
'data': data
})
except:
self._ignore_exception()
return self._error('%s %s' % (method, url))
开发者ID:Shoply,项目名称:Mailpile,代码行数:60,代码来源:hacks.py
示例5: unshortenurl
def unshortenurl(short):
from urllib import URLopener
opener = URLopener()
try:
opener.open(short)
except IOError, e:
f = e
开发者ID:Crazy-Cabbage,项目名称:wet,代码行数:7,代码来源:lib.py
示例6: read_component_sitemap
def read_component_sitemap(self, sitemapindex_uri, sitemap_uri, sitemap, sitemapindex_is_file):
"""Read a component sitemap of a Resource List with index
Each component must be a sitemap with the
"""
if (sitemapindex_is_file):
if (not self.is_file_uri(sitemap_uri)):
# Attempt to map URI to local file
remote_uri = sitemap_uri
sitemap_uri = self.mapper.src_to_dst(remote_uri)
self.logger.info("Mapped %s to local file %s" % (remote_uri, sitemap_uri))
else:
# The individual sitemaps should be at a URL (scheme/server/path)
# that the sitemapindex URL can speak authoritatively about
if (self.check_url_authority and
not UrlAuthority(sitemapindex_uri).has_authority_over(sitemap_uri)):
raise ListBaseIndexError("The sitemapindex (%s) refers to sitemap at a location it does not have authority over (%s)" % (sitemapindex_uri,sitemap_uri))
try:
fh = URLopener().open(sitemap_uri)
self.num_files += 1
except IOError as e:
raise ListBaseIndexError("Failed to load sitemap from %s listed in sitemap index %s (%s)" % (sitemap_uri,sitemapindex_uri,str(e)))
# Get the Content-Length if we can (works fine for local files)
try:
self.content_length = int(fh.info()['Content-Length'])
self.bytes_read += self.content_length
except KeyError:
# If we don't get a length then c'est la vie
pass
self.logger.info( "Reading sitemap from %s (%d bytes)" % (sitemap_uri,self.content_length) )
component = sitemap.parse_xml( fh=fh, sitemapindex=False )
# Copy resources into self, check any metadata
for r in component:
self.resources.add(r)
开发者ID:EHRI,项目名称:resync,代码行数:34,代码来源:list_base_with_index.py
示例7: getcif
def getcif(target):
"""
Get all ICSD cif files listed in target file.
The target file should contain tag like '# BCC'.
"""
matgenIDs=getMatgenIDs()
if not os.path.isdir('./ciffiles'):
os.makedirs('./ciffiles')
with open(target,'r') as f:
st=f.readline()
t1=time.time()
while st:
if st[0]=='#':
tg=st.split()[-1]
st=f.readline()
t2=time.time()
print "time for the %s = %2.2f sec" %(tg,t2-t1)
t1=time.time()
continue
st=st.strip()
ind=getID(st)
if ind in matgenIDs:
continue #skip matgen compounds
URL=prefix+tg+'/'+st+'/'+st+'.cif'
testfile=URLopener()
try:
testfile.retrieve(URL,'ciffiles/'+st)
except:
print "Error: ",URL
st=f.readline()
开发者ID:3juholee,项目名称:materialproject_ml,代码行数:33,代码来源:api_icsd_aflow.py
示例8: check_the_mangas
class check_the_mangas():
def __init__(self,manga_name, db_conn):
self.db_conn = db_conn
self.manga_name = manga_name
self.manga_oldnumber = sqlite_manager.get_manga_chapter(
db_conn,
manga_name)
self.manga_nownumber = self.manga_oldnumber
self.manga_olddate = sqlite_manager.get_manga_date(
db_conn,
manga_name)
self.nowdate = self.today_date()
self.br = URLopener()
def today_date(self):
return subprocess.check_output(["date","+%a-%b-%e"]).replace("\n","")
#return 1 if the connection is working
def test_connection(self):
try:
response = self.br.open(configuration.WEBSITE_TO_CHECK_CONNECTION).read()
if configuration.KEYWORD in response:
return 1
else:
return 0
except:
print "manga connection"
return 0
def exec_cmd(self):
pid = os.fork()
os.umask(0)
os.system(configuration.MANGA_NEW_CMD.replace("MANGA",self.manga_name))
def run(self):
if( self.test_connection() ):
last_chapter = False
try:
while(last_chapter==False):
to_open = "http://www.mangareader.net/" + self.manga_name + "/" + str( int(self.manga_nownumber)+1 )
response = self.br.open( to_open).read()
if "is not released yet" in response or "not published yet" in response or response == "":
last_chapter = True
if self.manga_nownumber != sqlite_manager.get_manga_chapter(self.db_conn, self.manga_name):
print self.manga_name+":"+self.manga_nownumber+":"+self.nowdate
sqlite_manager.update_manga(self.db_conn,
self.manga_name,
self.manga_nownumber,
self.nowdate)
else:
self.manga_nownumber = str( int(self.manga_nownumber)+1 )
except Exception,e :
if "is not released yet. If you liked" in response:
if self.manga_nownumber != sqlite_manager.get_manga_chapter(self.db_conn,self.manga_name):
print self.manga_name+":"+self.manga_nownumber+":"+self.nowdate
sqlite_manager.update_manga(self.db_conn,
self.manga_name,
self.manga_nownumber,
self.nowdate)
pass
开发者ID:venam,项目名称:updater_v2,代码行数:60,代码来源:mangas.py
示例9: command
def command(self):
args = list(self.args)
method, url = args[0:2]
if not url.startswith("http"):
url = "http://%s:%s%s" % (
self.session.config.sys.http_host,
self.session.config.sys.http_port,
("/" + url).replace("//", "/"),
)
# FIXME: The python URLopener doesn't seem to support other verbs,
# which is really quite lame.
method = method.upper()
assert method in ("GET", "POST")
qv, pv = [], []
if method == "POST":
which = pv
else:
which = qv
for arg in args[2:]:
if "=" in arg:
which.append(tuple(arg.split("=", 1)))
elif arg.upper()[0] == "P":
which = pv
elif arg.upper()[0] == "Q":
which = qv
if qv:
qv = urlencode(qv)
url += ("?" in url and "&" or "?") + qv
# Log us in automagically!
httpd = self.session.config.http_worker.httpd
global HACKS_SESSION_ID
if HACKS_SESSION_ID is None:
HACKS_SESSION_ID = httpd.make_session_id(None)
mailpile.auth.SetLoggedIn(None, user="Hacks plugin HTTP client", session_id=HACKS_SESSION_ID)
cookie = httpd.session_cookie
try:
uo = URLopener()
uo.addheader("Cookie", "%s=%s" % (cookie, HACKS_SESSION_ID))
with TcpConnBroker().context(need=[TcpConnBroker.OUTGOING_HTTP], oneshot=True):
if method == "POST":
(fn, hdrs) = uo.retrieve(url, data=urlencode(pv))
else:
(fn, hdrs) = uo.retrieve(url)
hdrs = unicode(hdrs)
data = open(fn, "rb").read().strip()
if data.startswith("{") and "application/json" in hdrs:
data = json.loads(data)
return self._success("%s %s" % (method, url), result={"headers": hdrs.splitlines(), "data": data})
except:
self._ignore_exception()
return self._error("%s %s" % (method, url))
开发者ID:kitsuta,项目名称:Mailpile,代码行数:57,代码来源:hacks.py
示例10: connection
def connection():
try:
br = URLopener()
response = br.open(configuration.WEBSITE_TO_CHECK_CONNECTION).read()
if configuration.KEYWORD in response:
return 1
else:
return 0
except:
return 0
开发者ID:venam,项目名称:updater,代码行数:10,代码来源:mangas.py
示例11: __init__
def __init__(self, source, proxy = ""):
self.source = source
if len(proxy) > 0:
self._opener = URLopener({"http": proxy})
else:
self._opener = URLopener()
self._fetchQueue = Queue(0)
self._fetchThread = Thread(target = self._FetchTile)
self._fetchThread.setDaemon(True)
self._fetchThread.start()
开发者ID:CiderMan,项目名称:PGTips,代码行数:10,代码来源:tile_cache.py
示例12: utGrabFromUrl
def utGrabFromUrl(p_url):
""" Takes a file from a remote server """
from urllib import URLopener
try:
l_opener = URLopener()
l_file = l_opener.open(p_url)
ctype = l_file.headers['Content-Type']
l_opener.close()
return (l_file.read(), ctype)
except:
return (None, 'text/x-unknown-content-type')
开发者ID:eea,项目名称:Products.Reportek,代码行数:11,代码来源:RepUtils.py
示例13: SlippyCache
class SlippyCache(object):
"""This is a basic map tile cache used by the SlippyPanel class
to retrieve and store locally the images that form the map"""
def __init__(self, source, proxy = ""):
self.source = source
if len(proxy) > 0:
self._opener = URLopener({"http": proxy})
else:
self._opener = URLopener()
self._fetchQueue = Queue(0)
self._fetchThread = Thread(target = self._FetchTile)
self._fetchThread.setDaemon(True)
self._fetchThread.start()
def _FetchTile(self):
task = ""
while task is not None:
task = self._fetchQueue.get()
url, fname = task
if not os.path.isfile(fname):
print "Getting", fname
try:
self._opener.retrieve(url, "tmp.png")
shutil.move("tmp.png", fname)
except IOError:
pass
self._fetchQueue.task_done()
def StartNewFetchBatch(self):
try:
while True:
item = self._fetchQueue.get(False)
self._fetchQueue.task_done()
except Empty:
pass
def GetTileFilename(self, xtile, ytile, zoom):
numTiles = 2 ** zoom
while xtile >= numTiles:
xtile -= numTiles
if xtile < 0 or ytile < 0 or ytile >= numTiles:
# Indicate that this is not a valid tile
return None
else:
fname = "/".join([self.source.get_full_name(), str(zoom), str(xtile), str(ytile) + ".png"])
if not os.path.isfile(fname):
url = self.source.get_tile_url(xtile, ytile, zoom)
# Ensure that the directory exists
dname = os.path.dirname(fname)
if not os.path.isdir(dname):
os.makedirs(dname)
self._fetchQueue.put((url, fname))
# Valid tile, though may not be present in the cache
return fname
开发者ID:CiderMan,项目名称:PGTips,代码行数:54,代码来源:tile_cache.py
示例14: read
def read(self, uri=None, resources=None, index_only=False):
"""Read sitemap from a URI including handling sitemapindexes
If index_only is True then individual sitemaps references in a sitemapindex
will not be read. This will result in no resources being returned and is
useful only to read the metadata and links listed in the sitemapindex.
Includes the subtlety that if the input URI is a local file and is a
sitemapindex which contains URIs for the individual sitemaps, then these
are mapped to the filesystem also.
"""
try:
fh = URLopener().open(uri)
self.num_files += 1
except IOError as e:
raise IOError("Failed to load sitemap/sitemapindex from %s (%s)" % (uri,str(e)))
# Get the Content-Length if we can (works fine for local files)
try:
self.content_length = int(fh.info()['Content-Length'])
self.bytes_read += self.content_length
self.logger.debug( "Read %d bytes from %s" % (self.content_length,uri) )
except KeyError:
# If we don't get a length then c'est la vie
self.logger.debug( "Read ????? bytes from %s" % (uri) )
pass
self.logger.info( "Read sitemap/sitemapindex from %s" % (uri) )
s = self.new_sitemap()
s.parse_xml(fh=fh,resources=self,capability=self.capability_name)
# what did we read? sitemap or sitemapindex?
if (s.parsed_index):
# sitemapindex
if (not self.allow_multifile):
raise ListBaseIndexError("Got sitemapindex from %s but support for sitemapindex disabled" % (uri))
self.logger.info( "Parsed as sitemapindex, %d sitemaps" % (len(self.resources)) )
sitemapindex_is_file = self.is_file_uri(uri)
if (index_only):
# don't read the component sitemaps
self.sitemapindex = True
return
# now loop over all entries to read each sitemap and add to resources
sitemaps = self.resources
self.resources = self.resources_class()
self.logger.info( "Now reading %d sitemaps" % len(sitemaps.uris()) )
for sitemap_uri in sorted(sitemaps.uris()):
self.read_component_sitemap(uri,sitemap_uri,s,sitemapindex_is_file)
else:
# sitemap
self.logger.info( "Parsed as sitemap, %d resources" % (len(self.resources)) )
开发者ID:EHRI,项目名称:resync,代码行数:48,代码来源:list_base_with_index.py
示例15: command
def command(self):
args = list(self.args)
method, url = args[0:2]
if not url.startswith('http'):
url = 'http://%s:%s%s' % (self.session.config.sys.http_host,
self.session.config.sys.http_port,
('/' + url).replace('//', '/'))
# FIXME: The python URLopener doesn't seem to support other verbs,
# which is really quite lame.
method = method.upper()
assert(method in ('GET', 'POST'))
qv, pv = [], []
if method == 'POST':
which = pv
else:
which = qv
for arg in args[2:]:
if '=' in arg:
which.append(tuple(arg.split('=', 1)))
elif arg.upper()[0] == 'P':
which = pv
elif arg.upper()[0] == 'Q':
which = qv
if qv:
qv = urlencode(qv)
url += ('?' in url and '&' or '?') + qv
try:
uo = URLopener()
if method == 'POST':
(fn, hdrs) = uo.retrieve(url, data=urlencode(pv))
else:
(fn, hdrs) = uo.retrieve(url)
hdrs = unicode(hdrs)
data = open(fn, 'rb').read().strip()
if data.startswith('{') and 'application/json' in hdrs:
data = json.loads(data)
return self._success('%s %s' % (method, url), result={
'headers': hdrs.splitlines(),
'data': data
})
except:
self._ignore_exception()
return self._error('%s %s' % (method, url))
开发者ID:bart-j,项目名称:Mailpile,代码行数:48,代码来源:hacks.py
示例16: __init__
def __init__(self,manga_name):
self.manga_name = manga_name
self.myfile = open(configuration.DATA_FILE,'r').read()
self.manga_oldnumber = self.get_number()
self.manga_nownumber = self.manga_oldnumber
self.manga_olddate = self.get_date ()
self.nowdate = self.today_date()
self.br = URLopener()
开发者ID:venam,项目名称:updater,代码行数:8,代码来源:mangas.py
示例17: download_reports
def download_reports(years=_years, weeks=_weeks):
'''Crawls through IMoH website and download all excel files in the given weeks and years'''
# Create paths for logging files and download loaction
prefix = datetime.now().strftime('./log/weeklies/%y%m%d_%H%M%S_')
log_d = prefix + "downloads.log"
log_f = prefix + "FAILED.log"
base_loc = 'http://www.health.gov.il/PublicationsFiles/IWER'
# URL object
my_file = URLopener()
for year in years:
print "\n", year,
for week in weeks:
f = open(log_d, 'a')
f.write('\n{year}_{week}: '.format(week=week, year=year))
# There are many different options of paths
options = ['{base}{week:02d}_{year}.xls'.format(base=base_loc, week=week, year=year),
'{base}{week}_{year}.xls'.format(base=base_loc, week=week, year=year),
'{base}{week:02d}_{year}.xlsx'.format(base=base_loc, week=week, year=year),
'{base}{week}_{year}.xlsx'.format(base=base_loc, week=week, year=year)]
for i, o in enumerate(options):
filetype = o.split(".")[-1]
try:
# Try different paths on remote, but always save on same path locally
my_file.retrieve(o,
'./data/weeklies/{year}_{week:02d}.{ft}'.format(week=week, year=year, ft=filetype))
# If succeeds write which filetype (xls/x) was saved
f.write('{ft}'.format(ft=filetype), )
# If downloads succeeds move close the log file and break the loop
f.close()
break
except:
# When option excepted, write try number to the log
f.write("{} ".format(i + 1))
# If all options were exhausted, it has failed.
if i == len(options) - 1 and week != 53:
print "== {year}_{week:02d} FAILED ==".format(week=week, year=year),
with open(log_f, 'a') as failed:
failed.write("{year}_{week:02d} FAILED\n".format(week=week, year=year))
f.write("FAILED")
f.close()
f.close()
开发者ID:DeanLa,项目名称:IMoH,代码行数:42,代码来源:__init__.py
示例18: call_remote
def call_remote(self,category,params):
'''
The meetup api is set up such that the root url does not
change much other than the'name' of the thing you call into.
In other words, I can just use category to sprintf my way to a
valid url, then tack on the rest of the query string specified
in params.
'''
url = self.root_url
url = url % (category)
# Every call has to include key
url = url + "?" + params + "&key=" + self.key
client = URLopener()
request = client.open(url)
raw_str = request.read()
results = json.loads(raw_str)
# Let the caller interpret the results of the call. Both the
# meta info and the results are passed back
return results
开发者ID:PhillyPUG,项目名称:phillypug,代码行数:20,代码来源:meetup.py
示例19: __init__
class Updater:
"""
takes a server location and an info file as parameters in the constructor
it will use this server to fetch the new information
there should be a /hash and /info.json dir on this server
"""
def __init__(self,server,infoFile):
self._server = server
self._infoFile = infoFile
self.br = URLopener()
"""
hasNewInfo :: Boolean
compare the local info file hash with the one found on the server
and returns true if they are different
"""
def hasNewInfo(self):
f = open(self._infoFile,'r').read()
m = md5.new(f).hexdigest()
response = self.br.open(self._server+'/hash').read()
response = response.replace("\n","")
return (m!=response)
"""
generateTimeStamp :: String
returns a string that is used to timestamp old config backup files
"""
def generateTimeStamp(self):
return str(time.gmtime().tm_year)+"_"+str(time.gmtime().tm_mday)+"_"+str(time.gmtime().tm_hour)+"_"+str(time.gmtime().tm_min)
"""
fetchNewInfo :: Void
it will download the info file from the server
use the timestamp to back it up
and overwrite it
"""
def fetchNewInfo(self):
response = self.br.open(self._server+'/info.json').read()
oldInfo = open(self._infoFile,'r').read()
open(self._infoFile+"."+self.generateTimeStamp(),'w').write(oldInfo)
open(self._infoFile,'w').write(response)
开发者ID:venam,项目名称:ricer-helper,代码行数:41,代码来源:Updater.py
示例20: __init__
class Updater:
def __init__(self,server,infoFile):
self._server = server
self._infoFile = infoFile
self.br = URLopener()
def hasNewInfo(self):
f = open(self._infoFile,'r').read()
m = md5.new(f).hexdigest()
response = self.br.open(self._server+'/hash').read()
response = response.replace("\n","")
return (m!=response)
def generateTimeStamp(self):
return str(time.gmtime().tm_year)+"_"+str(time.gmtime().tm_mday)+"_"+str(time.gmtime().tm_hour)+"_"+str(time.gmtime().tm_min)
def fetchNewInfo(self):
response = self.br.open(self._server+'/info.json').read()
oldInfo = open(self._infoFile,'r').read()
open(self._infoFile+"."+self.generateTimeStamp(),'w').write(oldInfo)
open(self._infoFile,'w').write(response)
开发者ID:b4dtR1p,项目名称:ricer-helper,代码行数:21,代码来源:Updater.py
注:本文中的urllib.URLopener类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论