本文整理汇总了Python中utils.proxy.get_proxy函数的典型用法代码示例。如果您正苦于以下问题:Python get_proxy函数的具体用法?Python get_proxy怎么用?Python get_proxy使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_proxy函数的13个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_get_proxy
def test_get_proxy(self):
agentConfig = {
"proxy_host": "localhost",
"proxy_port": 4242,
"proxy_user": "foo",
"proxy_password": "bar"
}
proxy_from_config = get_proxy(agentConfig)
self.assertEqual(proxy_from_config,
{
"host": "localhost",
"port": 4242,
"user": "foo",
"password": "bar",
})
os.environ["HTTPS_PROXY"] = "https://fooenv:[email protected]:4444"
proxy_from_env = get_proxy({})
self.assertEqual(proxy_from_env,
{
"host": "google.com",
"port": 4444,
"user": "fooenv",
"password": "barenv"
})
开发者ID:serverdensity,项目名称:sd-agent,代码行数:27,代码来源:test_common.py
示例2: __init__
def __init__(self, name, init_config, agentConfig, instances):
self.ca_certs = init_config.get('ca_certs', get_ca_certs_path())
proxy_settings = get_proxy(agentConfig)
self.proxies = {
"http": None,
"https": None,
}
if proxy_settings:
uri = "{host}:{port}".format(
host=proxy_settings['host'],
port=proxy_settings['port'])
if proxy_settings['user'] and proxy_settings['password']:
uri = "{user}:{password}@{uri}".format(
user=proxy_settings['user'],
password=proxy_settings['password'],
uri=uri)
self.proxies['http'] = "http://{uri}".format(uri=uri)
self.proxies['https'] = "https://{uri}".format(uri=uri)
else:
self.proxies['http'] = environ.get('HTTP_PROXY', None)
self.proxies['https'] = environ.get('HTTPS_PROXY', None)
self.proxies['no'] = environ.get('no_proxy',
environ.get('NO_PROXY', None)
)
NetworkCheck.__init__(self, name, init_config, agentConfig, instances)
开发者ID:cryptspirit,项目名称:dd-agent,代码行数:27,代码来源:http_check.py
示例3: __init__
def __init__(self, name, init_config, agentConfig, instances=None):
"""
Initialize a new check.
:param name: The name of the check
:param init_config: The config for initializing the check
:param agentConfig: The global configuration for the agent
:param instances: A list of configuration objects for each instance.
"""
from aggregator import MetricsAggregator
self._enabled_checks.append(name)
self._enabled_checks = list(set(self._enabled_checks))
self.name = name
self.init_config = init_config or {}
self.agentConfig = agentConfig
self.in_developer_mode = agentConfig.get("developer_mode") and psutil
self._internal_profiling_stats = None
self.default_integration_http_timeout = float(agentConfig.get("default_integration_http_timeout", 9))
self.hostname = agentConfig.get("checksd_hostname") or get_hostname(agentConfig)
self.log = logging.getLogger("%s.%s" % (__name__, name))
self.min_collection_interval = self.init_config.get(
"min_collection_interval", self.DEFAULT_MIN_COLLECTION_INTERVAL
)
self.aggregator = MetricsAggregator(
self.hostname,
expiry_seconds=self.min_collection_interval + self.DEFAULT_EXPIRY_SECONDS,
formatter=agent_formatter,
recent_point_threshold=agentConfig.get("recent_point_threshold", None),
histogram_aggregates=agentConfig.get("histogram_aggregates"),
histogram_percentiles=agentConfig.get("histogram_percentiles"),
)
self.events = []
self.service_checks = []
self.instances = instances or []
self.warnings = []
self.library_versions = None
self.last_collection_time = defaultdict(int)
self._instance_metadata = []
self.svc_metadata = []
self.historate_dict = {}
# Set proxy settings
self.proxy_settings = get_proxy(self.agentConfig)
self._use_proxy = False if init_config is None else init_config.get("use_agent_proxy", True)
self.proxies = {"http": None, "https": None}
if self.proxy_settings and self._use_proxy:
uri = "{host}:{port}".format(host=self.proxy_settings["host"], port=self.proxy_settings["port"])
if self.proxy_settings["user"] and self.proxy_settings["password"]:
uri = "{user}:{password}@{uri}".format(
user=self.proxy_settings["user"], password=self.proxy_settings["password"], uri=uri
)
self.proxies["http"] = "http://{uri}".format(uri=uri)
self.proxies["https"] = "https://{uri}".format(uri=uri)
开发者ID:cberry777,项目名称:dd-agent,代码行数:59,代码来源:__init__.py
示例4: get_tags
def get_tags(agentConfig):
"""
Retrieve AWS EC2 tags.
"""
if not agentConfig['collect_instance_metadata']:
log.info("Instance metadata collection is disabled. Not collecting it.")
return []
EC2_tags = []
socket_to = None
try:
socket_to = socket.getdefaulttimeout()
socket.setdefaulttimeout(EC2.TIMEOUT)
except Exception:
pass
try:
iam_role = EC2.get_iam_role()
iam_params = json.loads(urllib2.urlopen(EC2.METADATA_URL_BASE + "/iam/security-credentials/" + unicode(iam_role)).read().strip())
instance_identity = json.loads(urllib2.urlopen(EC2.INSTANCE_IDENTITY_URL).read().strip())
region = instance_identity['region']
import boto.ec2
proxy_settings = get_proxy(agentConfig) or {}
connection = boto.ec2.connect_to_region(
region,
aws_access_key_id=iam_params['AccessKeyId'],
aws_secret_access_key=iam_params['SecretAccessKey'],
security_token=iam_params['Token'],
proxy=proxy_settings.get('host'), proxy_port=proxy_settings.get('port'),
proxy_user=proxy_settings.get('user'), proxy_pass=proxy_settings.get('password')
)
tag_object = connection.get_all_tags({'resource-id': EC2.metadata['instance-id']})
EC2_tags = [u"%s:%s" % (tag.name, tag.value) for tag in tag_object]
if agentConfig.get('collect_security_groups') and EC2.metadata.get('security-groups'):
EC2_tags.append(u"security-group-name:{0}".format(EC2.metadata.get('security-groups')))
except EC2.NoIAMRole:
log.warning(
u"Unable to retrieve AWS EC2 custom tags: "
u"an IAM role associated with the instance is required"
)
except Exception:
log.exception("Problem retrieving custom EC2 tags")
try:
if socket_to is None:
socket_to = 3
socket.setdefaulttimeout(socket_to)
except Exception:
pass
return EC2_tags
开发者ID:jalaziz,项目名称:dd-agent,代码行数:55,代码来源:util.py
示例5: __init__
def __init__(self, name, init_config, agentConfig, instances):
self.ca_certs = init_config.get("ca_certs", get_ca_certs_path())
proxy_settings = get_proxy(agentConfig)
if not proxy_settings:
self.proxies = None
else:
uri = "{host}:{port}".format(host=proxy_settings["host"], port=proxy_settings["port"])
if proxy_settings["user"] and proxy_settings["password"]:
uri = "{user}:{password}@{uri}".format(
user=proxy_settings["user"], password=proxy_settings["password"], uri=uri
)
self.proxies = {"http": "http://{uri}".format(uri=uri), "https": "https://{uri}".format(uri=uri)}
NetworkCheck.__init__(self, name, init_config, agentConfig, instances)
开发者ID:johnaxel,项目名称:dd-agent,代码行数:14,代码来源:http_check.py
示例6: get_tags
def get_tags(agentConfig):
"""
Retrieve AWS EC2 tags.
"""
if not agentConfig["collect_instance_metadata"]:
log.info("Instance metadata collection is disabled. Not collecting it.")
return []
EC2_tags = []
try:
iam_role = EC2.get_iam_role()
iam_url = EC2.METADATA_URL_BASE + "/iam/security-credentials/" + unicode(iam_role)
r = requests.get(iam_url, timeout=EC2.TIMEOUT)
r.raise_for_status() # Fail on 404 etc
iam_params = r.json()
r = requests.get(EC2.INSTANCE_IDENTITY_URL, timeout=EC2.TIMEOUT)
r.raise_for_status()
instance_identity = r.json()
region = instance_identity["region"]
import boto.ec2
proxy_settings = get_proxy(agentConfig) or {}
connection = boto.ec2.connect_to_region(
region,
aws_access_key_id=iam_params["AccessKeyId"],
aws_secret_access_key=iam_params["SecretAccessKey"],
security_token=iam_params["Token"],
proxy=proxy_settings.get("host"),
proxy_port=proxy_settings.get("port"),
proxy_user=proxy_settings.get("user"),
proxy_pass=proxy_settings.get("password"),
)
tag_object = connection.get_all_tags({"resource-id": EC2.metadata["instance-id"]})
EC2_tags = [u"%s:%s" % (tag.name, tag.value) for tag in tag_object]
if agentConfig.get("collect_security_groups") and EC2.metadata.get("security-groups"):
EC2_tags.append(u"security-group-name:{0}".format(EC2.metadata.get("security-groups")))
except EC2.NoIAMRole:
log.warning(
u"Unable to retrieve AWS EC2 custom tags: " u"an IAM role associated with the instance is required"
)
except Exception:
log.exception("Problem retrieving custom EC2 tags")
return EC2_tags
开发者ID:n0ts,项目名称:dd-agent,代码行数:49,代码来源:cloud_metadata.py
示例7: __init__
def __init__(self, name, init_config, agentConfig, instances):
self.ca_certs = init_config.get('ca_certs', get_ca_certs_path())
proxy_settings = get_proxy(agentConfig)
if not proxy_settings:
self.proxies = None
else:
uri = "{host}:{port}".format(
host=proxy_settings['host'],
port=proxy_settings['port'])
if proxy_settings['user'] and proxy_settings['password']:
uri = "{user}:{password}@{uri}".format(
user=proxy_settings['user'],
password=proxy_settings['password'],
uri=uri)
self.proxies = {
'http': "http://{uri}".format(uri=uri),
'https': "https://{uri}".format(uri=uri)
}
NetworkCheck.__init__(self, name, init_config, agentConfig, instances)
开发者ID:RedWhiteMiko,项目名称:dd-agent,代码行数:20,代码来源:http_check.py
示例8: validate_api_key
def validate_api_key(config):
try:
proxy = get_proxy(agentConfig=config)
request_proxy = {}
if proxy:
request_proxy = {'https': "http://{user}:{password}@{host}:{port}".format(**proxy)}
r = requests.get("%s/api/v1/validate" % config['dd_url'].rstrip('/'),
params={'api_key': config.get('api_key')}, proxies=request_proxy, timeout=3)
if r.status_code == 403:
return "API Key is invalid"
r.raise_for_status()
except requests.RequestException:
return "Unable to validate API Key. Please try again later"
except Exception:
log.exception("Unable to validate API Key")
return "Unable to validate API Key (unexpected error). Please try again later"
return "API Key is valid"
开发者ID:motusllc,项目名称:dd-agent,代码行数:21,代码来源:check_status.py
示例9: validate_api_key
def validate_api_key(config):
try:
proxy = get_proxy(agentConfig=config)
request_proxy = {}
if proxy:
# key might set to None
user = proxy.get("user", "") or ""
password = proxy.get("password", "") or ""
if user:
if password:
user += ":" + password
user += "@"
host = proxy.get("host", "") or ""
port = proxy.get("port", "") or ""
if host and port:
host += ":" + str(proxy["port"])
request_proxy = {'https': "http://%s%s" % (user, host)}
r = requests.get("%s/api/v1/validate" % config['dd_url'].rstrip('/'),
params={'api_key': config.get('api_key')}, proxies=request_proxy,
timeout=3, verify=(not config.get('skip_ssl_validation', False)))
if r.status_code == 403:
return "[ERROR] API Key is invalid"
r.raise_for_status()
except requests.RequestException:
return "[ERROR] Unable to validate API Key. Please try again later"
except Exception:
log.exception("Unable to validate API Key")
return "[ERROR] Unable to validate API Key (unexpected error). Please try again later"
return "API Key is valid"
开发者ID:DataDog,项目名称:dd-agent,代码行数:36,代码来源:check_status.py
示例10: get_config
#.........这里部分代码省略.........
agentConfig['statsd_forward_host'] = config.get('Main', 'statsd_forward_host')
if config.has_option('Main', 'statsd_forward_port'):
agentConfig['statsd_forward_port'] = int(config.get('Main', 'statsd_forward_port'))
# Optional config
# FIXME not the prettiest code ever...
if config.has_option('Main', 'use_mount'):
agentConfig['use_mount'] = _is_affirmative(config.get('Main', 'use_mount'))
if options is not None and options.autorestart:
agentConfig['autorestart'] = True
elif config.has_option('Main', 'autorestart'):
agentConfig['autorestart'] = _is_affirmative(config.get('Main', 'autorestart'))
if config.has_option('Main', 'check_timings'):
agentConfig['check_timings'] = _is_affirmative(config.get('Main', 'check_timings'))
if config.has_option('Main', 'exclude_process_args'):
agentConfig['exclude_process_args'] = _is_affirmative(config.get('Main', 'exclude_process_args'))
try:
filter_device_re = config.get('Main', 'device_blacklist_re')
agentConfig['device_blacklist_re'] = re.compile(filter_device_re)
except ConfigParser.NoOptionError:
pass
# Dogstream config
if config.has_option("Main", "dogstream_log"):
# Older version, single log support
log_path = config.get("Main", "dogstream_log")
if config.has_option("Main", "dogstream_line_parser"):
agentConfig["dogstreams"] = ':'.join([log_path, config.get("Main", "dogstream_line_parser")])
else:
agentConfig["dogstreams"] = log_path
elif config.has_option("Main", "dogstreams"):
agentConfig["dogstreams"] = config.get("Main", "dogstreams")
if config.has_option("Main", "nagios_perf_cfg"):
agentConfig["nagios_perf_cfg"] = config.get("Main", "nagios_perf_cfg")
if config.has_option("Main", "use_curl_http_client"):
agentConfig["use_curl_http_client"] = _is_affirmative(config.get("Main", "use_curl_http_client"))
else:
# Default to False as there are some issues with the curl client and ELB
agentConfig["use_curl_http_client"] = False
if config.has_section('WMI'):
agentConfig['WMI'] = {}
for key, value in config.items('WMI'):
agentConfig['WMI'][key] = value
if (config.has_option("Main", "limit_memory_consumption") and
config.get("Main", "limit_memory_consumption") is not None):
agentConfig["limit_memory_consumption"] = int(config.get("Main", "limit_memory_consumption"))
else:
agentConfig["limit_memory_consumption"] = None
if config.has_option("Main", "skip_ssl_validation"):
agentConfig["skip_ssl_validation"] = _is_affirmative(config.get("Main", "skip_ssl_validation"))
agentConfig["collect_instance_metadata"] = True
if config.has_option("Main", "collect_instance_metadata"):
agentConfig["collect_instance_metadata"] = _is_affirmative(config.get("Main", "collect_instance_metadata"))
agentConfig["proxy_forbid_method_switch"] = False
if config.has_option("Main", "proxy_forbid_method_switch"):
agentConfig["proxy_forbid_method_switch"] = _is_affirmative(config.get("Main", "proxy_forbid_method_switch"))
agentConfig["collect_ec2_tags"] = False
if config.has_option("Main", "collect_ec2_tags"):
agentConfig["collect_ec2_tags"] = _is_affirmative(config.get("Main", "collect_ec2_tags"))
agentConfig["utf8_decoding"] = False
if config.has_option("Main", "utf8_decoding"):
agentConfig["utf8_decoding"] = _is_affirmative(config.get("Main", "utf8_decoding"))
agentConfig["gce_updated_hostname"] = False
if config.has_option("Main", "gce_updated_hostname"):
agentConfig["gce_updated_hostname"] = _is_affirmative(config.get("Main", "gce_updated_hostname"))
except ConfigParser.NoSectionError as e:
sys.stderr.write('Config file not found or incorrectly formatted.\n')
sys.exit(2)
except ConfigParser.ParsingError as e:
sys.stderr.write('Config file not found or incorrectly formatted.\n')
sys.exit(2)
except ConfigParser.NoOptionError as e:
sys.stderr.write('There are some items missing from your config file, but nothing fatal [%s]' % e)
# Storing proxy settings in the agentConfig
agentConfig['proxy_settings'] = get_proxy(agentConfig)
if agentConfig.get('ca_certs', None) is None:
agentConfig['ssl_certificate'] = get_ssl_certificate(get_os(), 'datadog-cert.pem')
else:
agentConfig['ssl_certificate'] = agentConfig['ca_certs']
return agentConfig
开发者ID:ewdurbin,项目名称:dd-agent,代码行数:101,代码来源:config.py
示例11: _is_affirmative
if config.has_option("Main", "utf8_decoding"):
agentConfig["utf8_decoding"] = _is_affirmative(config.get("Main", "utf8_decoding"))
except ConfigParser.NoSectionError, e:
sys.stderr.write('Config file not found or incorrectly formatted.\n')
sys.exit(2)
except ConfigParser.ParsingError, e:
sys.stderr.write('Config file not found or incorrectly formatted.\n')
sys.exit(2)
except ConfigParser.NoOptionError, e:
sys.stderr.write('There are some items missing from your config file, but nothing fatal [%s]' % e)
# Storing proxy settings in the agentConfig
agentConfig['proxy_settings'] = get_proxy(agentConfig)
if agentConfig.get('ca_certs', None) is None:
agentConfig['ssl_certificate'] = get_ssl_certificate(get_os(), 'datadog-cert.pem')
else:
agentConfig['ssl_certificate'] = agentConfig['ca_certs']
return agentConfig
def get_system_stats():
systemStats = {
'machine': platform.machine(),
'platform': sys.platform,
'processor': platform.processor(),
'pythonV': platform.python_version(),
}
开发者ID:Shopify,项目名称:dd-agent,代码行数:31,代码来源:config.py
示例12: __init__
def __init__(self, name, init_config, agentConfig, instances=None):
"""
Initialize a new check.
:param name: The name of the check
:param init_config: The config for initializing the check
:param agentConfig: The global configuration for the agent
:param instances: A list of configuration objects for each instance.
"""
from aggregator import MetricsAggregator
self._enabled_checks.append(name)
self._enabled_checks = list(set(self._enabled_checks))
self.name = name
self.init_config = init_config or {}
self.agentConfig = agentConfig
self.in_developer_mode = agentConfig.get('developer_mode') and psutil
self._internal_profiling_stats = None
self.default_integration_http_timeout = float(agentConfig.get('default_integration_http_timeout', 9))
self.hostname = agentConfig.get('checksd_hostname') or get_hostname(agentConfig)
self.log = logging.getLogger('%s.%s' % (__name__, name))
self.min_collection_interval = self.init_config.get('min_collection_interval',
self.DEFAULT_MIN_COLLECTION_INTERVAL)
self.aggregator = MetricsAggregator(
self.hostname,
expiry_seconds = self.min_collection_interval + self.DEFAULT_EXPIRY_SECONDS,
formatter=agent_formatter,
recent_point_threshold=agentConfig.get('recent_point_threshold', None),
histogram_aggregates=agentConfig.get('histogram_aggregates'),
histogram_percentiles=agentConfig.get('histogram_percentiles')
)
if Platform.is_linux() and psutil is not None:
procfs_path = self.agentConfig.get('procfs_path', '/proc').rstrip('/')
psutil.PROCFS_PATH = procfs_path
self.events = []
self.service_checks = []
self.instances = instances or []
self.warnings = []
self.library_versions = None
self.last_collection_time = defaultdict(int)
self._instance_metadata = []
self.svc_metadata = []
self.historate_dict = {}
# Set proxy settings
self.proxy_settings = get_proxy(self.agentConfig)
self._use_proxy = False if init_config is None else init_config.get("use_agent_proxy", True)
self.proxies = {
"http": None,
"https": None,
}
if self.proxy_settings and self._use_proxy:
uri = "{host}:{port}".format(
host=self.proxy_settings['host'],
port=self.proxy_settings['port'])
if self.proxy_settings['user'] and self.proxy_settings['password']:
uri = "{user}:{password}@{uri}".format(
user=self.proxy_settings['user'],
password=self.proxy_settings['password'],
uri=uri)
self.proxies['http'] = "http://{uri}".format(uri=uri)
self.proxies['https'] = "https://{uri}".format(uri=uri)
开发者ID:MinerKasch,项目名称:dd-agent,代码行数:68,代码来源:__init__.py
示例13: get_tags
def get_tags(agentConfig):
"""
Retrieve AWS EC2 tags.
"""
if not agentConfig['collect_instance_metadata']:
log.info("Instance metadata collection is disabled. Not collecting it.")
return []
EC2_tags = []
try:
iam_role = EC2.get_iam_role()
iam_url = EC2.METADATA_URL_BASE + "/iam/security-credentials/" + unicode(iam_role)
r = requests.get(iam_url, timeout=EC2.TIMEOUT)
r.raise_for_status() # Fail on 404 etc
iam_params = r.json()
r = requests.get(EC2.INSTANCE_IDENTITY_URL, timeout=EC2.TIMEOUT)
r.raise_for_status()
instance_identity = r.json()
region = instance_identity['region']
import boto.ec2
proxy_settings = get_proxy(agentConfig) or {}
connection = boto.ec2.connect_to_region(
region,
aws_access_key_id=iam_params['AccessKeyId'],
aws_secret_access_key=iam_params['SecretAccessKey'],
security_token=iam_params['Token'],
proxy=proxy_settings.get('host'), proxy_port=proxy_settings.get('port'),
proxy_user=proxy_settings.get('user'), proxy_pass=proxy_settings.get('password')
)
tag_object = connection.get_all_tags({'resource-id': EC2.metadata['instance-id']})
EC2_tags = [u"%s:%s" % (tag.name, tag.value) for tag in tag_object]
if agentConfig.get('collect_security_groups') and EC2.metadata.get('security-groups'):
EC2_tags.append(u"security-group-name:{0}".format(EC2.metadata.get('security-groups')))
except EC2.NoIAMRole:
log.warning(
u"Unable to retrieve AWS EC2 custom tags: "
u"an IAM role associated with the instance is required"
)
except Exception:
log.exception("Problem retrieving custom EC2 tags")
if EC2.is_openstack is True and agentConfig['openstack_use_metadata_tags']:
log.info(u"Attempting to collect tags from OpenStack meta_data.json")
openstack_metadata_url = EC2.EC2_METADATA_HOST + "/openstack/latest/meta_data.json"
try:
r = requests.get(openstack_metadata_url, timeout=EC2.TIMEOUT)
r.raise_for_status() # Fail on 404 etc
openstack_metadata = r.json()
EC2_tags = [u"%s:%s" % (tag, openstack_metadata['meta'][tag]) for tag in openstack_metadata['meta']]
if 'project_id' in openstack_metadata:
EC2_tags.append(u"project_id:%s" % openstack_metadata['project_id'])
# Map the OS availability_zone to Datadog's use of availability-zone for UI defaults
EC2_tags.append(u"availability-zone:%s" % openstack_metadata['availability_zone'])
# Even though the name is set in EC2.metadata it also needs to be a tag for filters
if 'name' not in openstack_metadata['meta']:
EC2_tags.append(u"name:%s" % openstack_metadata['name'])
except Exception:
log.warning(u"Problem retrieving tags from OpenStack meta_data.json")
return EC2_tags
开发者ID:serverdensity,项目名称:sd-agent,代码行数:66,代码来源:cloud_metadata.py
注:本文中的utils.proxy.get_proxy函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论