• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python urllib.URLopener类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中urllib.URLopener的典型用法代码示例。如果您正苦于以下问题:Python URLopener类的具体用法?Python URLopener怎么用?Python URLopener使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了URLopener类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: download_package

def download_package(pkg_name, pkg_version):
  '''Download the required package. Sometimes the download can be flaky, so we use the
  retry decorator.'''
  pkg_type = 'sdist' # Don't download wheel archives for now
  # This JSON endpoint is not provided by PyPI mirrors so we always need to get this
  # from pypi.python.org.
  pkg_info = json.loads(urlopen('https://pypi.python.org/pypi/%s/json' % pkg_name).read())

  downloader = URLopener()
  for pkg in pkg_info['releases'][pkg_version]:
    if pkg['packagetype'] == pkg_type:
      filename = pkg['filename']
      expected_md5 = pkg['md5_digest']
      if os.path.isfile(filename) and check_md5sum(filename, expected_md5):
        print "File with matching md5sum already exists, skipping %s" % filename
        return True
      pkg_url = "{0}/packages/{1}".format(PYPI_MIRROR, pkg['path'])
      print "Downloading %s from %s" % (filename, pkg_url)
      downloader.retrieve(pkg_url, filename)
      actual_md5 = md5(open(filename).read()).hexdigest()
      if check_md5sum(filename, expected_md5):
        return True
      else:
        print "MD5 mismatch in file %s." % filename
        return False
  print "Could not find archive to download for %s %s %s" % (
      pkg_name, pkg_version, pkg_type)
  sys.exit(1)
开发者ID:michaelhkw,项目名称:Impala,代码行数:28,代码来源:pip_download.py


示例2: handle_starttag

    def handle_starttag(self, tag, attrs):
        #tmpoutput = ""
        count = 0
        global bDoWork
        #self.output = ""
        # Only parse the 'anchor' tag.
        if tag == "a":
            # Check the list of defined attributes.
            for name, value in attrs:
                # If href is defined, print it.
                if name == "href":
                    if value[len(value) - 3:len(value)] == "jpg":
                        #print value
                        if not "http://" in value and bDoWork == True: 
                            bDoWork = False
                            tmpoutput = value
                            #print "Val: " + value
                            imgurl = 'http://apod.nasa.gov/apod/' + tmpoutput
                            #print "IMGURL: " + imgurl
                            filename = imgurl.split('/')[-1]
                            #print "FileName: " + filename

                            if (not os.path.isfile(filename)) and ('apod.nasa.gov' in imgurl):
                                #print "Downloading: " + filename
                                image = URLopener()
                                image.retrieve(imgurl,filename) 
                                sleep(lWaitTime)
                            elif (os.path.isfile(filename)):
                                print "Verified: " + filename
                            break
开发者ID:DaveZahn,项目名称:APOD_Downloader,代码行数:30,代码来源:APOD_Downloader_Raspbian.py


示例3: check_the_mangas

class check_the_mangas():
	def __init__(self,manga_name):
		self.manga_name	     = manga_name
		self.myfile		     = open(configuration.DATA_FILE,'r').read()
		self.manga_oldnumber = self.get_number()
		self.manga_nownumber = self.manga_oldnumber
		self.manga_olddate   = self.get_date  ()
		self.nowdate		 = self.today_date()
		self.br			     = URLopener()

	def get_number(self):
		return re.findall(self.manga_name+':([0-9]+):',self.myfile)[0]

	def get_date(self):
		return re.findall(self.manga_name+":"+str(self.manga_oldnumber)+':(.*)\n',self.myfile)[0]

	def today_date(self):
		return subprocess.check_output(["date","+%a-%b-%e"]).replace("\n","")

	#return 1 if the connection is working
	def test_connection(self):
		try:
			response = self.br.open(configuration.WEBSITE_TO_CHECK_CONNECTION).read()
			if configuration.KEYWORD in response:
				return 1
			else:
				return 0
		except:
			print "manga connection"
			return 0

	def exec_cmd(self):
		pid = os.fork()
		os.umask(0)
		os.system(configuration.MANGA_NEW_CMD.replace("MANGA",self.manga_name))

	def run(self):
		if( self.test_connection() ):
			last_chapter = False
			try:
				while(last_chapter==False):
					to_open = "http://www.mangareader.net/" + self.manga_name + "/" + str( int(self.manga_nownumber)+1 )
					response = self.br.open( to_open).read()
					if "is not released yet" in response or "not published yet" in response or response == "":
						last_chapter = True
						if self.manga_name + ":" + str(self.manga_nownumber) not in open(configuration.DATA_FILE, "r").read():
							Thread(target=self.exec_cmd).start()
							configuration.backup()
							open(configuration.DATA_FILE,'w').write(open(configuration.DATA_FILE+".bak", "r").read().replace(self.manga_name+":"+str(self.manga_oldnumber)+":"+ self.manga_olddate, self.manga_name+":"+str(self.manga_nownumber)+":"+self.nowdate))
					else:
						print "not last chapter"
						self.manga_nownumber = str( int(self.manga_nownumber)+1 )
			except Exception,e :
				print e
				print "manga run"
				if "is not released yet. If you liked" in response:
					if self.manga_name + ":" + str(self.manga_nownumber) not in open(configuration.DATA_FILE, "r").read():
						configuration.backup()
						open(configuration.DATA_FILE,'w').write(open(configuration.DATA_FILE+".bak", "r").read().replace(self.manga_name+":"+str(self.manga_oldnumber)+":"+ self.manga_olddate, self.manga_name+":"+str(self.manga_nownumber)+":"+self.nowdate))
				pass
开发者ID:venam,项目名称:updater,代码行数:60,代码来源:mangas.py


示例4: command

    def command(self):
        args = list(self.args)
        method, url = args[0:2]

        if not url.startswith('http'):
            url = 'http://%s:%s%s' % (self.session.config.sys.http_host,
                                      self.session.config.sys.http_port,
                                      ('/' + url).replace('//', '/'))

        # FIXME: The python URLopener doesn't seem to support other verbs,
        #        which is really quite lame.
        method = method.upper()
        assert(method in ('GET', 'POST'))

        qv, pv = [], []
        if method == 'POST':
            which = pv
        else:
            which = qv
        for arg in args[2:]:
            if '=' in arg:
                which.append(tuple(arg.split('=', 1)))
            elif arg.upper()[0] == 'P':
                which = pv
            elif arg.upper()[0] == 'Q':
                which = qv

        if qv:
            qv = urlencode(qv)
            url += ('?' in url and '&' or '?') + qv

        # Log us in automagically!
        httpd = self.session.config.http_worker.httpd
        global HACKS_SESSION_ID
        if HACKS_SESSION_ID is None:
            HACKS_SESSION_ID = httpd.make_session_id(None)
        mailpile.auth.SetLoggedIn(None,
                                  user='Hacks plugin HTTP client',
                                  session_id=HACKS_SESSION_ID)
        cookie = httpd.session_cookie

        try:
            uo = URLopener()
            uo.addheader('Cookie', '%s=%s' % (cookie, HACKS_SESSION_ID))
            with TcpConnBroker().context(need=[TcpConnBroker.OUTGOING_HTTP]):
                if method == 'POST':
                    (fn, hdrs) = uo.retrieve(url, data=urlencode(pv))
                else:
                    (fn, hdrs) = uo.retrieve(url)
            hdrs = unicode(hdrs)
            data = open(fn, 'rb').read().strip()
            if data.startswith('{') and 'application/json' in hdrs:
                data = json.loads(data)
            return self._success('%s %s' % (method, url), result={
                'headers': hdrs.splitlines(),
                'data': data
            })
        except:
            self._ignore_exception()
            return self._error('%s %s' % (method, url))
开发者ID:Shoply,项目名称:Mailpile,代码行数:60,代码来源:hacks.py


示例5: unshortenurl

def unshortenurl(short):
    from urllib import URLopener
    opener = URLopener()
    try:
        opener.open(short)
    except IOError, e:
        f = e
开发者ID:Crazy-Cabbage,项目名称:wet,代码行数:7,代码来源:lib.py


示例6: read_component_sitemap

    def read_component_sitemap(self, sitemapindex_uri, sitemap_uri, sitemap, sitemapindex_is_file):
        """Read a component sitemap of a Resource List with index

        Each component must be a sitemap with the 
        """
        if (sitemapindex_is_file):
            if (not self.is_file_uri(sitemap_uri)):
                # Attempt to map URI to local file
                remote_uri = sitemap_uri
                sitemap_uri = self.mapper.src_to_dst(remote_uri)
                self.logger.info("Mapped %s to local file %s" % (remote_uri, sitemap_uri))
            else:
                # The individual sitemaps should be at a URL (scheme/server/path)
                # that the sitemapindex URL can speak authoritatively about
                if (self.check_url_authority and
                    not UrlAuthority(sitemapindex_uri).has_authority_over(sitemap_uri)):
                    raise ListBaseIndexError("The sitemapindex (%s) refers to sitemap at a location it does not have authority over (%s)" % (sitemapindex_uri,sitemap_uri))
        try:
            fh = URLopener().open(sitemap_uri)
            self.num_files += 1
        except IOError as e:
            raise ListBaseIndexError("Failed to load sitemap from %s listed in sitemap index %s (%s)" % (sitemap_uri,sitemapindex_uri,str(e)))
        # Get the Content-Length if we can (works fine for local files)
        try:
            self.content_length = int(fh.info()['Content-Length'])
            self.bytes_read += self.content_length
        except KeyError:
            # If we don't get a length then c'est la vie
            pass
        self.logger.info( "Reading sitemap from %s (%d bytes)" % (sitemap_uri,self.content_length) )
        component = sitemap.parse_xml( fh=fh, sitemapindex=False )
        # Copy resources into self, check any metadata
        for r in component:
            self.resources.add(r)
开发者ID:EHRI,项目名称:resync,代码行数:34,代码来源:list_base_with_index.py


示例7: getcif

def getcif(target):
    """
    Get all ICSD cif files listed in target file.
    The target file should contain tag like '# BCC'.
    """
    matgenIDs=getMatgenIDs()

    if not os.path.isdir('./ciffiles'):
        os.makedirs('./ciffiles')

    with open(target,'r') as f:
        st=f.readline()
        t1=time.time()
        while st:
            if st[0]=='#':
                tg=st.split()[-1]
                st=f.readline()
                t2=time.time()
                print "time for the %s = %2.2f sec" %(tg,t2-t1) 
                t1=time.time()
                continue
            st=st.strip()
            ind=getID(st)
            if ind in matgenIDs:
                continue #skip matgen compounds
            URL=prefix+tg+'/'+st+'/'+st+'.cif' 
            testfile=URLopener()
            try:
                testfile.retrieve(URL,'ciffiles/'+st)
            except:
                print "Error: ",URL

            st=f.readline()
开发者ID:3juholee,项目名称:materialproject_ml,代码行数:33,代码来源:api_icsd_aflow.py


示例8: check_the_mangas

class check_the_mangas():
    def __init__(self,manga_name, db_conn):
        self.db_conn = db_conn
        self.manga_name = manga_name
        self.manga_oldnumber = sqlite_manager.get_manga_chapter(
            db_conn,
            manga_name)
        self.manga_nownumber = self.manga_oldnumber
        self.manga_olddate = sqlite_manager.get_manga_date(
            db_conn,
            manga_name)
        self.nowdate = self.today_date()
        self.br = URLopener()

    def today_date(self):
        return subprocess.check_output(["date","+%a-%b-%e"]).replace("\n","")

    #return 1 if the connection is working
    def test_connection(self):
        try:
            response = self.br.open(configuration.WEBSITE_TO_CHECK_CONNECTION).read()
            if configuration.KEYWORD in response:
                return 1
            else:
                return 0
        except:
            print "manga connection"
            return 0

    def exec_cmd(self):
        pid = os.fork()
        os.umask(0)
        os.system(configuration.MANGA_NEW_CMD.replace("MANGA",self.manga_name))

    def run(self):
        if( self.test_connection() ):
            last_chapter = False
            try:
                while(last_chapter==False):
                    to_open = "http://www.mangareader.net/" + self.manga_name + "/" + str( int(self.manga_nownumber)+1 )
                    response = self.br.open( to_open).read()
                    if "is not released yet" in response or "not published yet" in response or response == "":
                        last_chapter = True
                        if self.manga_nownumber != sqlite_manager.get_manga_chapter(self.db_conn, self.manga_name):
                            print self.manga_name+":"+self.manga_nownumber+":"+self.nowdate
                            sqlite_manager.update_manga(self.db_conn,
                                self.manga_name,
                                self.manga_nownumber,
                                self.nowdate)
                    else:
                        self.manga_nownumber = str( int(self.manga_nownumber)+1 )
            except Exception,e :
                if "is not released yet. If you liked" in response:
                    if self.manga_nownumber != sqlite_manager.get_manga_chapter(self.db_conn,self.manga_name):
                        print self.manga_name+":"+self.manga_nownumber+":"+self.nowdate
                        sqlite_manager.update_manga(self.db_conn,
                            self.manga_name,
                            self.manga_nownumber,
                            self.nowdate)
                pass
开发者ID:venam,项目名称:updater_v2,代码行数:60,代码来源:mangas.py


示例9: command

    def command(self):
        args = list(self.args)
        method, url = args[0:2]

        if not url.startswith("http"):
            url = "http://%s:%s%s" % (
                self.session.config.sys.http_host,
                self.session.config.sys.http_port,
                ("/" + url).replace("//", "/"),
            )

        # FIXME: The python URLopener doesn't seem to support other verbs,
        #        which is really quite lame.
        method = method.upper()
        assert method in ("GET", "POST")

        qv, pv = [], []
        if method == "POST":
            which = pv
        else:
            which = qv
        for arg in args[2:]:
            if "=" in arg:
                which.append(tuple(arg.split("=", 1)))
            elif arg.upper()[0] == "P":
                which = pv
            elif arg.upper()[0] == "Q":
                which = qv

        if qv:
            qv = urlencode(qv)
            url += ("?" in url and "&" or "?") + qv

        # Log us in automagically!
        httpd = self.session.config.http_worker.httpd
        global HACKS_SESSION_ID
        if HACKS_SESSION_ID is None:
            HACKS_SESSION_ID = httpd.make_session_id(None)
        mailpile.auth.SetLoggedIn(None, user="Hacks plugin HTTP client", session_id=HACKS_SESSION_ID)
        cookie = httpd.session_cookie

        try:
            uo = URLopener()
            uo.addheader("Cookie", "%s=%s" % (cookie, HACKS_SESSION_ID))
            with TcpConnBroker().context(need=[TcpConnBroker.OUTGOING_HTTP], oneshot=True):
                if method == "POST":
                    (fn, hdrs) = uo.retrieve(url, data=urlencode(pv))
                else:
                    (fn, hdrs) = uo.retrieve(url)
            hdrs = unicode(hdrs)
            data = open(fn, "rb").read().strip()
            if data.startswith("{") and "application/json" in hdrs:
                data = json.loads(data)
            return self._success("%s %s" % (method, url), result={"headers": hdrs.splitlines(), "data": data})
        except:
            self._ignore_exception()
            return self._error("%s %s" % (method, url))
开发者ID:kitsuta,项目名称:Mailpile,代码行数:57,代码来源:hacks.py


示例10: connection

def connection():
	try:
		br = URLopener()
		response = br.open(configuration.WEBSITE_TO_CHECK_CONNECTION).read()
		if configuration.KEYWORD in response:
			return 1
		else:
			return 0
	except:
		return 0
开发者ID:venam,项目名称:updater,代码行数:10,代码来源:mangas.py


示例11: __init__

 def __init__(self, source, proxy = ""):
     self.source = source
     if len(proxy) > 0:
         self._opener = URLopener({"http": proxy})
     else:
         self._opener = URLopener()
     self._fetchQueue = Queue(0)
     self._fetchThread = Thread(target = self._FetchTile)
     self._fetchThread.setDaemon(True)
     self._fetchThread.start()
开发者ID:CiderMan,项目名称:PGTips,代码行数:10,代码来源:tile_cache.py


示例12: utGrabFromUrl

def utGrabFromUrl(p_url):
    """ Takes a file from a remote server """
    from urllib import URLopener
    try:
        l_opener = URLopener()
        l_file = l_opener.open(p_url)
        ctype = l_file.headers['Content-Type']
        l_opener.close()
        return (l_file.read(), ctype)
    except:
        return (None, 'text/x-unknown-content-type')
开发者ID:eea,项目名称:Products.Reportek,代码行数:11,代码来源:RepUtils.py


示例13: SlippyCache

class SlippyCache(object):
    """This is a basic map tile cache used by the SlippyPanel class
    to retrieve and store locally the images that form the map"""
    def __init__(self, source, proxy = ""):
        self.source = source
        if len(proxy) > 0:
            self._opener = URLopener({"http": proxy})
        else:
            self._opener = URLopener()
        self._fetchQueue = Queue(0)
        self._fetchThread = Thread(target = self._FetchTile)
        self._fetchThread.setDaemon(True)
        self._fetchThread.start()

    def _FetchTile(self):
        task = ""
        while task is not None:
            task = self._fetchQueue.get()
            url, fname = task
            if not os.path.isfile(fname):
                print "Getting", fname
                try:
                    self._opener.retrieve(url, "tmp.png")
                    shutil.move("tmp.png", fname)
                except IOError:
                    pass
            self._fetchQueue.task_done()

    def StartNewFetchBatch(self):
        try:
            while True:
                item = self._fetchQueue.get(False)
                self._fetchQueue.task_done()
        except Empty:
            pass

    def GetTileFilename(self, xtile, ytile, zoom):
        numTiles = 2 ** zoom
        while xtile >= numTiles:
            xtile -= numTiles
        if xtile < 0 or ytile < 0 or ytile >= numTiles:
            # Indicate that this is not a valid tile
            return None
        else:
            fname = "/".join([self.source.get_full_name(), str(zoom), str(xtile), str(ytile) + ".png"])
            if not os.path.isfile(fname):
                url = self.source.get_tile_url(xtile, ytile, zoom)
                # Ensure that the directory exists
                dname = os.path.dirname(fname)
                if not os.path.isdir(dname):
                    os.makedirs(dname)
                self._fetchQueue.put((url, fname))
            # Valid tile, though may not be present in the cache
            return fname
开发者ID:CiderMan,项目名称:PGTips,代码行数:54,代码来源:tile_cache.py


示例14: read

    def read(self, uri=None, resources=None, index_only=False):
        """Read sitemap from a URI including handling sitemapindexes

        If index_only is True then individual sitemaps references in a sitemapindex
        will not be read. This will result in no resources being returned and is
        useful only to read the metadata and links listed in the sitemapindex.

        Includes the subtlety that if the input URI is a local file and is a 
        sitemapindex which contains URIs for the individual sitemaps, then these
        are mapped to the filesystem also.
        """
        try:
            fh = URLopener().open(uri)
            self.num_files += 1
        except IOError as e:
            raise IOError("Failed to load sitemap/sitemapindex from %s (%s)" % (uri,str(e)))
        # Get the Content-Length if we can (works fine for local files)
        try:
            self.content_length = int(fh.info()['Content-Length'])
            self.bytes_read += self.content_length
            self.logger.debug( "Read %d bytes from %s" % (self.content_length,uri) )
        except KeyError:
            # If we don't get a length then c'est la vie
            self.logger.debug( "Read ????? bytes from %s" % (uri) )
            pass
        self.logger.info( "Read sitemap/sitemapindex from %s" % (uri) )
        s = self.new_sitemap()
        s.parse_xml(fh=fh,resources=self,capability=self.capability_name)
        # what did we read? sitemap or sitemapindex?
        if (s.parsed_index):
            # sitemapindex
            if (not self.allow_multifile):
                raise ListBaseIndexError("Got sitemapindex from %s but support for sitemapindex disabled" % (uri))
            self.logger.info( "Parsed as sitemapindex, %d sitemaps" % (len(self.resources)) )
            sitemapindex_is_file = self.is_file_uri(uri)
            if (index_only):
                # don't read the component sitemaps
                self.sitemapindex = True
                return
            # now loop over all entries to read each sitemap and add to resources
            sitemaps = self.resources
            self.resources = self.resources_class()
            self.logger.info( "Now reading %d sitemaps" % len(sitemaps.uris()) )
            for sitemap_uri in sorted(sitemaps.uris()):
                self.read_component_sitemap(uri,sitemap_uri,s,sitemapindex_is_file)
        else:
            # sitemap
            self.logger.info( "Parsed as sitemap, %d resources" % (len(self.resources)) )
开发者ID:EHRI,项目名称:resync,代码行数:48,代码来源:list_base_with_index.py


示例15: command

    def command(self):
        args = list(self.args)
        method, url = args[0:2]

        if not url.startswith('http'):
            url = 'http://%s:%s%s' % (self.session.config.sys.http_host,
                                      self.session.config.sys.http_port,
                                      ('/' + url).replace('//', '/'))

        # FIXME: The python URLopener doesn't seem to support other verbs,
        #        which is really quite lame.
        method = method.upper()
        assert(method in ('GET', 'POST'))

        qv, pv = [], []
        if method == 'POST':
            which = pv
        else:
            which = qv
        for arg in args[2:]:
            if '=' in arg:
                which.append(tuple(arg.split('=', 1)))
            elif arg.upper()[0] == 'P':
                which = pv
            elif arg.upper()[0] == 'Q':
                which = qv

        if qv:
            qv = urlencode(qv)
            url += ('?' in url and '&' or '?') + qv

        try:
            uo = URLopener()
            if method == 'POST':
                (fn, hdrs) = uo.retrieve(url, data=urlencode(pv))
            else:
                (fn, hdrs) = uo.retrieve(url)
            hdrs = unicode(hdrs)
            data = open(fn, 'rb').read().strip()
            if data.startswith('{') and 'application/json' in hdrs:
                data = json.loads(data)
            return self._success('%s %s' % (method, url), result={
                'headers': hdrs.splitlines(),
                'data': data
            })
        except:
            self._ignore_exception()
            return self._error('%s %s' % (method, url))
开发者ID:bart-j,项目名称:Mailpile,代码行数:48,代码来源:hacks.py


示例16: __init__

	def __init__(self,manga_name):
		self.manga_name	     = manga_name
		self.myfile		     = open(configuration.DATA_FILE,'r').read()
		self.manga_oldnumber = self.get_number()
		self.manga_nownumber = self.manga_oldnumber
		self.manga_olddate   = self.get_date  ()
		self.nowdate		 = self.today_date()
		self.br			     = URLopener()
开发者ID:venam,项目名称:updater,代码行数:8,代码来源:mangas.py


示例17: download_reports

def download_reports(years=_years, weeks=_weeks):
    '''Crawls through IMoH website and download all excel files in the given weeks and years'''
    # Create paths for logging files and download loaction
    prefix = datetime.now().strftime('./log/weeklies/%y%m%d_%H%M%S_')
    log_d = prefix + "downloads.log"
    log_f = prefix + "FAILED.log"
    base_loc = 'http://www.health.gov.il/PublicationsFiles/IWER'
    # URL object
    my_file = URLopener()

    for year in years:
        print "\n", year,
        for week in weeks:
            f = open(log_d, 'a')
            f.write('\n{year}_{week}: '.format(week=week, year=year))
            # There are many different options of paths
            options = ['{base}{week:02d}_{year}.xls'.format(base=base_loc, week=week, year=year),
                       '{base}{week}_{year}.xls'.format(base=base_loc, week=week, year=year),
                       '{base}{week:02d}_{year}.xlsx'.format(base=base_loc, week=week, year=year),
                       '{base}{week}_{year}.xlsx'.format(base=base_loc, week=week, year=year)]
            for i, o in enumerate(options):
                filetype = o.split(".")[-1]
                try:
                    # Try different paths on remote, but always save on same path locally
                    my_file.retrieve(o,
                                     './data/weeklies/{year}_{week:02d}.{ft}'.format(week=week, year=year, ft=filetype))
                    # If succeeds write which filetype (xls/x) was saved
                    f.write('{ft}'.format(ft=filetype), )
                    # If downloads succeeds move close the log file and break the loop
                    f.close()
                    break
                except:
                    # When option excepted, write try number to the log
                    f.write("{} ".format(i + 1))
                    # If all options were exhausted, it has failed.
                    if i == len(options) - 1 and week != 53:
                        print "== {year}_{week:02d} FAILED ==".format(week=week, year=year),
                        with open(log_f, 'a') as failed:
                            failed.write("{year}_{week:02d} FAILED\n".format(week=week, year=year))
                        f.write("FAILED")
                        f.close()
        f.close()
开发者ID:DeanLa,项目名称:IMoH,代码行数:42,代码来源:__init__.py


示例18: call_remote

    def call_remote(self,category,params):
        '''
        The meetup api is set up such that the root url does not
        change much other than the'name' of the thing you call into.

        In other words, I can just use category to sprintf my way to a
        valid url, then tack on the rest of the query string specified
        in params.
        '''
        url = self.root_url
        url = url % (category)
        # Every call has to include key
        url = url + "?" + params + "&key=" + self.key
        client = URLopener()
        request = client.open(url)
        raw_str = request.read()
        results = json.loads(raw_str)
        # Let the caller interpret the results of the call. Both the
        # meta info and the results are passed back
        return results
开发者ID:PhillyPUG,项目名称:phillypug,代码行数:20,代码来源:meetup.py


示例19: __init__

class Updater:
    """
    takes a server location and an info file as parameters in the constructor
    it will use this server to fetch the new information
    there should be a /hash and /info.json dir on this server
    """
    def __init__(self,server,infoFile):
        self._server = server
        self._infoFile = infoFile
        self.br = URLopener()

    """
    hasNewInfo :: Boolean
    compare the local info file hash with the one found on the server
    and returns true if they are different
    """
    def hasNewInfo(self):
        f = open(self._infoFile,'r').read()
        m = md5.new(f).hexdigest()
        response = self.br.open(self._server+'/hash').read()
        response = response.replace("\n","")
        return (m!=response)

    """
    generateTimeStamp :: String
    returns a string that is used to timestamp old config  backup files
    """
    def generateTimeStamp(self):
        return str(time.gmtime().tm_year)+"_"+str(time.gmtime().tm_mday)+"_"+str(time.gmtime().tm_hour)+"_"+str(time.gmtime().tm_min)

    """
    fetchNewInfo :: Void
    it will download the info file from the server
    use the timestamp to back it up
    and overwrite it
    """
    def fetchNewInfo(self):
        response = self.br.open(self._server+'/info.json').read()
        oldInfo = open(self._infoFile,'r').read()
        open(self._infoFile+"."+self.generateTimeStamp(),'w').write(oldInfo)
        open(self._infoFile,'w').write(response)
开发者ID:venam,项目名称:ricer-helper,代码行数:41,代码来源:Updater.py


示例20: __init__

class Updater:
    def __init__(self,server,infoFile):
        self._server = server
        self._infoFile = infoFile
        self.br = URLopener()

    def hasNewInfo(self):
        f = open(self._infoFile,'r').read()
        m = md5.new(f).hexdigest()
        response = self.br.open(self._server+'/hash').read()
        response = response.replace("\n","")
        return (m!=response)

    def generateTimeStamp(self):
        return str(time.gmtime().tm_year)+"_"+str(time.gmtime().tm_mday)+"_"+str(time.gmtime().tm_hour)+"_"+str(time.gmtime().tm_min)

    def fetchNewInfo(self):
        response = self.br.open(self._server+'/info.json').read()
        oldInfo = open(self._infoFile,'r').read()
        open(self._infoFile+"."+self.generateTimeStamp(),'w').write(oldInfo)
        open(self._infoFile,'w').write(response)
开发者ID:b4dtR1p,项目名称:ricer-helper,代码行数:21,代码来源:Updater.py



注:本文中的urllib.URLopener类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python parse.parse_qsl函数代码示例发布时间:2022-05-27
下一篇:
Python urllib.FancyURLopener类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap