本文整理汇总了Python中test.service.SpawnedService类的典型用法代码示例。如果您正苦于以下问题:Python SpawnedService类的具体用法?Python SpawnedService怎么用?Python SpawnedService使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SpawnedService类的13个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: open
def open(self):
self.tmp_dir = tempfile.mkdtemp()
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port)
log.info(" tmp_dir = %s", self.tmp_dir)
# Generate configs
template = self.test_resource("zookeeper.properties")
properties = os.path.join(self.tmp_dir, "zookeeper.properties")
self.render_template(template, properties, vars(self))
# Configure Zookeeper child process
args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties)
env = self.kafka_run_class_env()
# Party!
self.out("Starting...")
timeout = 5
max_timeout = 30
backoff = 1
while True:
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max_timeout)
if self.child.wait_for(r"binding to port", timeout=timeout):
break
self.child.stop()
timeout *= 2
time.sleep(backoff)
self.out("Done!")
开发者ID:arnaud-lb,项目名称:kafka-python,代码行数:31,代码来源:fixtures.py
示例2: open
def open(self):
if self.running:
self.out("Instance already running")
return
self.tmp_dir = tempfile.mkdtemp()
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port)
log.info(" broker_id = %s", self.broker_id)
log.info(" zk_host = %s", self.zk_host)
log.info(" zk_port = %s", self.zk_port)
log.info(" zk_chroot = %s", self.zk_chroot)
log.info(" replicas = %s", self.replicas)
log.info(" partitions = %s", self.partitions)
log.info(" tmp_dir = %s", self.tmp_dir)
# Create directories
os.mkdir(os.path.join(self.tmp_dir, "logs"))
os.mkdir(os.path.join(self.tmp_dir, "data"))
# Generate configs
template = self.test_resource("kafka.properties")
properties = os.path.join(self.tmp_dir, "kafka.properties")
self.render_template(template, properties, vars(self))
# Party!
self.out("Creating Zookeeper chroot node...")
args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain",
"-server", "%s:%d" % (self.zk_host, self.zk_port),
"create",
"/%s" % self.zk_chroot,
"kafka-python")
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if proc.wait() != 0:
self.out("Failed to create Zookeeper chroot node")
self.out(proc.stdout.read())
self.out(proc.stderr.read())
raise RuntimeError("Failed to create Zookeeper chroot node")
self.out("Done!")
self.out("Starting...")
# Configure Kafka child process
args = self.kafka_run_class_args("kafka.Kafka", properties)
env = self.kafka_run_class_env()
while True:
self.child = SpawnedService(args, env)
self.child.start()
if self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id, timeout=5):
break
self.child.stop()
self.out("Done!")
self.running = True
开发者ID:lizj3624,项目名称:kafka-python,代码行数:57,代码来源:fixtures.py
示例3: ZookeeperFixture
class ZookeeperFixture(Fixture):
@classmethod
def instance(cls):
if "ZOOKEEPER_URI" in os.environ:
parse = urlparse(os.environ["ZOOKEEPER_URI"])
(host, port) = (parse.hostname, parse.port)
fixture = ExternalService(host, port)
else:
(host, port) = ("127.0.0.1", get_open_port())
fixture = cls(host, port)
fixture.open()
return fixture
def __init__(self, host, port):
self.host = host
self.port = port
self.tmp_dir = None
self.child = None
def out(self, message):
log.info("*** Zookeeper [%s:%d]: %s", self.host, self.port, message)
def open(self):
self.tmp_dir = tempfile.mkdtemp()
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port)
log.info(" tmp_dir = %s", self.tmp_dir)
# Generate configs
template = self.test_resource("zookeeper.properties")
properties = os.path.join(self.tmp_dir, "zookeeper.properties")
self.render_template(template, properties, vars(self))
# Configure Zookeeper child process
args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties)
env = self.kafka_run_class_env()
# Party!
self.out("Starting...")
while True:
self.child = SpawnedService(args, env)
self.child.start()
if self.child.wait_for(r"binding to port", timeout=5):
break
self.child.stop()
self.out("Done!")
def close(self):
self.out("Stopping...")
self.child.stop()
self.child = None
self.out("Done!")
shutil.rmtree(self.tmp_dir)
开发者ID:lizj3624,项目名称:kafka-python,代码行数:56,代码来源:fixtures.py
示例4: open
def open(self):
if self.tmp_dir is None:
self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member
self.tmp_dir.ensure(dir=True)
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port or '(auto)')
log.info(" tmp_dir = %s", self.tmp_dir.strpath)
# Configure Zookeeper child process
template = self.test_resource("zookeeper.properties")
properties = self.tmp_dir.join("zookeeper.properties")
args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain",
properties.strpath)
env = self.kafka_run_class_env()
# Party!
timeout = 5
max_timeout = 120
backoff = 1
end_at = time.time() + max_timeout
tries = 1
auto_port = (self.port is None)
while time.time() < end_at:
if auto_port:
self.port = get_open_port()
self.out('Attempting to start on port %d (try #%d)' % (self.port, tries))
self.render_template(template, properties, vars(self))
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max(end_at - time.time(), 0))
if self.child.wait_for(r"binding to port", timeout=timeout):
break
self.child.dump_logs()
self.child.stop()
timeout *= 2
time.sleep(backoff)
tries += 1
backoff += 1
else:
raise RuntimeError('Failed to start Zookeeper before max_timeout')
self.out("Done!")
atexit.register(self.close)
开发者ID:kngenie,项目名称:kafka-python,代码行数:44,代码来源:fixtures.py
示例5: open
def open(self):
self.tmp_dir = tempfile.mkdtemp()
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port)
log.info(" tmp_dir = %s", self.tmp_dir)
# Generate configs
template = self.test_resource("zookeeper.properties")
properties = os.path.join(self.tmp_dir, "zookeeper.properties")
self.render_template(template, properties, vars(self))
# Configure Zookeeper child process
args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties)
env = self.kafka_run_class_env()
# Party!
timeout = 5
max_timeout = 30
backoff = 1
end_at = time.time() + max_timeout
tries = 1
while time.time() < end_at:
self.out('Attempting to start (try #%d)' % tries)
try:
os.stat(properties)
except:
log.warning('Config %s not found -- re-rendering', properties)
self.render_template(template, properties, vars(self))
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max(end_at - time.time(), 0))
if self.child.wait_for(r"binding to port", timeout=timeout):
break
self.child.stop()
timeout *= 2
time.sleep(backoff)
tries += 1
else:
raise Exception('Failed to start Zookeeper before max_timeout')
self.out("Done!")
atexit.register(self.close)
开发者ID:EasyPost,项目名称:kafka-python,代码行数:42,代码来源:fixtures.py
示例6: open
def open(self):
self.tmp_dir = tempfile.mkdtemp()
self.out("Running local instance...")
logging.info(" host = %s", self.host)
logging.info(" port = %s", self.port)
logging.info(" tmp_dir = %s", self.tmp_dir)
# Generate configs
template = self.test_resource("zookeeper.properties")
properties = os.path.join(self.tmp_dir, "zookeeper.properties")
self.render_template(template, properties, vars(self))
# Configure Zookeeper child process
args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties)
env = self.kafka_run_class_env()
self.child = SpawnedService(args, env)
# Party!
self.out("Starting...")
self.child.start()
self.child.wait_for(r"Snapshotting")
self.out("Done!")
开发者ID:alvarohurtado84,项目名称:kafka-python,代码行数:22,代码来源:fixtures.py
示例7: start
def start(self):
# Configure Kafka child process
properties = self.tmp_dir.join("kafka.properties")
template = self.test_resource("kafka.properties")
args = self.kafka_run_class_args("kafka.Kafka", properties.strpath)
env = self.kafka_run_class_env()
timeout = 5
max_timeout = 120
backoff = 1
end_at = time.time() + max_timeout
tries = 1
auto_port = (self.port is None)
while time.time() < end_at:
# We have had problems with port conflicts on travis
# so we will try a different port on each retry
# unless the fixture was passed a specific port
if auto_port:
self.port = get_open_port()
self.out('Attempting to start on port %d (try #%d)' % (self.port, tries))
self.render_template(template, properties, vars(self))
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max(end_at - time.time(), 0))
if self.child.wait_for(self.start_pattern, timeout=timeout):
break
self.child.dump_logs()
self.child.stop()
timeout *= 2
time.sleep(backoff)
tries += 1
backoff += 1
else:
raise RuntimeError('Failed to start KafkaInstance before max_timeout')
(self._client,) = self.get_clients(1, '_internal_client')
self.out("Done!")
self.running = True
开发者ID:kngenie,项目名称:kafka-python,代码行数:39,代码来源:fixtures.py
示例8: KafkaFixture
class KafkaFixture(Fixture):
@classmethod
def instance(cls, broker_id, zk_host, zk_port,
zk_chroot=None, port=None, replicas=1, partitions=2):
if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
if "KAFKA_URI" in os.environ:
parse = urlparse(os.environ["KAFKA_URI"])
(host, port) = (parse.hostname, parse.port)
fixture = ExternalService(host, port)
else:
if port is None:
port = get_open_port()
host = "127.0.0.1"
fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot,
replicas=replicas, partitions=partitions)
fixture.open()
return fixture
def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2):
self.host = host
self.port = port
self.broker_id = broker_id
self.zk_host = zk_host
self.zk_port = zk_port
self.zk_chroot = zk_chroot
self.replicas = replicas
self.partitions = partitions
self.tmp_dir = None
self.child = None
self.running = False
def out(self, message):
log.info("*** Kafka [%s:%d]: %s", self.host, self.port, message)
def open(self):
if self.running:
self.out("Instance already running")
return
self.tmp_dir = tempfile.mkdtemp()
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port)
log.info(" broker_id = %s", self.broker_id)
log.info(" zk_host = %s", self.zk_host)
log.info(" zk_port = %s", self.zk_port)
log.info(" zk_chroot = %s", self.zk_chroot)
log.info(" replicas = %s", self.replicas)
log.info(" partitions = %s", self.partitions)
log.info(" tmp_dir = %s", self.tmp_dir)
# Create directories
os.mkdir(os.path.join(self.tmp_dir, "logs"))
os.mkdir(os.path.join(self.tmp_dir, "data"))
# Generate configs
template = self.test_resource("kafka.properties")
properties = os.path.join(self.tmp_dir, "kafka.properties")
self.render_template(template, properties, vars(self))
# Party!
self.out("Creating Zookeeper chroot node...")
args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain",
"-server", "%s:%d" % (self.zk_host, self.zk_port),
"create",
"/%s" % self.zk_chroot,
"kafka-python")
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if proc.wait() != 0:
self.out("Failed to create Zookeeper chroot node")
self.out(proc.stdout.read())
self.out(proc.stderr.read())
raise RuntimeError("Failed to create Zookeeper chroot node")
self.out("Done!")
self.out("Starting...")
# Configure Kafka child process
args = self.kafka_run_class_args("kafka.Kafka", properties)
env = self.kafka_run_class_env()
timeout = 5
max_timeout = 30
backoff = 1
while True:
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max_timeout)
if self.child.wait_for(r"\[Kafka Server %d\], Started" %
self.broker_id, timeout=timeout):
break
self.child.stop()
timeout *= 2
#.........这里部分代码省略.........
开发者ID:arnaud-lb,项目名称:kafka-python,代码行数:101,代码来源:fixtures.py
示例9: ZookeeperFixture
class ZookeeperFixture(Fixture):
@classmethod
def instance(cls):
if "ZOOKEEPER_URI" in os.environ:
parse = urlparse(os.environ["ZOOKEEPER_URI"])
(host, port) = (parse.hostname, parse.port)
fixture = ExternalService(host, port)
else:
(host, port) = ("127.0.0.1", get_open_port())
fixture = cls(host, port)
fixture.open()
return fixture
def __init__(self, host, port):
self.host = host
self.port = port
self.tmp_dir = None
self.child = None
def kafka_run_class_env(self):
env = super(ZookeeperFixture, self).kafka_run_class_env()
env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs')
return env
def out(self, message):
log.info("*** Zookeeper [%s:%d]: %s", self.host, self.port, message)
def open(self):
self.tmp_dir = tempfile.mkdtemp()
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port)
log.info(" tmp_dir = %s", self.tmp_dir)
# Generate configs
template = self.test_resource("zookeeper.properties")
properties = os.path.join(self.tmp_dir, "zookeeper.properties")
self.render_template(template, properties, vars(self))
# Configure Zookeeper child process
args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties)
env = self.kafka_run_class_env()
# Party!
timeout = 5
max_timeout = 30
backoff = 1
end_at = time.time() + max_timeout
tries = 1
while time.time() < end_at:
self.out('Attempting to start (try #%d)' % tries)
try:
os.stat(properties)
except:
log.warning('Config %s not found -- re-rendering', properties)
self.render_template(template, properties, vars(self))
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max(end_at - time.time(), 0))
if self.child.wait_for(r"binding to port", timeout=timeout):
break
self.child.stop()
timeout *= 2
time.sleep(backoff)
tries += 1
else:
raise Exception('Failed to start Zookeeper before max_timeout')
self.out("Done!")
atexit.register(self.close)
def close(self):
if self.child is None:
return
self.out("Stopping...")
self.child.stop()
self.child = None
self.out("Done!")
shutil.rmtree(self.tmp_dir)
def __del__(self):
self.close()
开发者ID:EasyPost,项目名称:kafka-python,代码行数:83,代码来源:fixtures.py
示例10: KafkaFixture
#.........这里部分代码省略.........
self.running = False
def kafka_run_class_env(self):
env = super(KafkaFixture, self).kafka_run_class_env()
env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs')
return env
def out(self, message):
log.info("*** Kafka [%s:%d]: %s", self.host, self.port, message)
def open(self):
if self.running:
self.out("Instance already running")
return
self.tmp_dir = tempfile.mkdtemp()
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port)
log.info(" transport = %s", self.transport)
log.info(" broker_id = %s", self.broker_id)
log.info(" zk_host = %s", self.zk_host)
log.info(" zk_port = %s", self.zk_port)
log.info(" zk_chroot = %s", self.zk_chroot)
log.info(" replicas = %s", self.replicas)
log.info(" partitions = %s", self.partitions)
log.info(" tmp_dir = %s", self.tmp_dir)
# Create directories
os.mkdir(os.path.join(self.tmp_dir, "logs"))
os.mkdir(os.path.join(self.tmp_dir, "data"))
# Generate configs
template = self.test_resource("kafka.properties")
properties = os.path.join(self.tmp_dir, "kafka.properties")
self.render_template(template, properties, vars(self))
# Party!
self.out("Creating Zookeeper chroot node...")
args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain",
"-server", "%s:%d" % (self.zk_host, self.zk_port),
"create",
"/%s" % self.zk_chroot,
"kafka-python")
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if proc.wait() != 0:
self.out("Failed to create Zookeeper chroot node")
self.out(proc.stdout.read())
self.out(proc.stderr.read())
raise RuntimeError("Failed to create Zookeeper chroot node")
self.out("Done!")
# Configure Kafka child process
args = self.kafka_run_class_args("kafka.Kafka", properties)
env = self.kafka_run_class_env()
timeout = 5
max_timeout = 30
backoff = 1
end_at = time.time() + max_timeout
tries = 1
while time.time() < end_at:
self.out('Attempting to start (try #%d)' % tries)
try:
os.stat(properties)
except:
log.warning('Config %s not found -- re-rendering', properties)
self.render_template(template, properties, vars(self))
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max(end_at - time.time(), 0))
if self.child.wait_for(r"\[Kafka Server %d\], Started" %
self.broker_id, timeout=timeout):
break
self.child.stop()
timeout *= 2
time.sleep(backoff)
tries += 1
else:
raise Exception('Failed to start KafkaInstance before max_timeout')
self.out("Done!")
self.running = True
atexit.register(self.close)
def __del__(self):
self.close()
def close(self):
if not self.running:
self.out("Instance already stopped")
return
self.out("Stopping...")
self.child.stop()
self.child = None
self.out("Done!")
shutil.rmtree(self.tmp_dir)
self.running = False
开发者ID:myqq156001980,项目名称:kafka-python,代码行数:101,代码来源:fixtures.py
示例11: open
def open(self):
if self.running:
self.out("Instance already running")
return
self.tmp_dir = tempfile.mkdtemp()
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port or '(auto)')
log.info(" transport = %s", self.transport)
log.info(" broker_id = %s", self.broker_id)
log.info(" zk_host = %s", self.zk_host)
log.info(" zk_port = %s", self.zk_port)
log.info(" zk_chroot = %s", self.zk_chroot)
log.info(" replicas = %s", self.replicas)
log.info(" partitions = %s", self.partitions)
log.info(" tmp_dir = %s", self.tmp_dir)
# Create directories
os.mkdir(os.path.join(self.tmp_dir, "logs"))
os.mkdir(os.path.join(self.tmp_dir, "data"))
self.out("Creating Zookeeper chroot node...")
args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain",
"-server", "%s:%d" % (self.zk_host, self.zk_port),
"create",
"/%s" % self.zk_chroot,
"kafka-python")
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if proc.wait() != 0:
self.out("Failed to create Zookeeper chroot node")
self.out(proc.stdout.read())
self.out(proc.stderr.read())
raise RuntimeError("Failed to create Zookeeper chroot node")
self.out("Done!")
# Configure Kafka child process
properties = os.path.join(self.tmp_dir, "kafka.properties")
template = self.test_resource("kafka.properties")
args = self.kafka_run_class_args("kafka.Kafka", properties)
env = self.kafka_run_class_env()
timeout = 5
max_timeout = 30
backoff = 1
end_at = time.time() + max_timeout
tries = 1
auto_port = (self.port is None)
while time.time() < end_at:
# We have had problems with port conflicts on travis
# so we will try a different port on each retry
# unless the fixture was passed a specific port
if auto_port:
self.port = get_open_port()
self.out('Attempting to start on port %d (try #%d)' % (self.port, tries))
self.render_template(template, properties, vars(self))
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max(end_at - time.time(), 0))
if self.child.wait_for(r"\[Kafka Server %d\], Started" %
self.broker_id, timeout=timeout):
break
self.child.dump_logs()
self.child.stop()
timeout *= 2
time.sleep(backoff)
tries += 1
else:
raise Exception('Failed to start KafkaInstance before max_timeout')
self.out("Done!")
self.running = True
atexit.register(self.close)
开发者ID:Omnistac,项目名称:kafka-python,代码行数:74,代码来源:fixtures.py
示例12: KafkaFixture
#.........这里部分代码省略.........
"/%s" % self.zk_chroot,
"kafka-python")
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if proc.wait() != 0 or proc.returncode != 0:
self.out("Failed to create Zookeeper chroot node")
self.out(proc.stdout.read())
self.out(proc.stderr.read())
raise RuntimeError("Failed to create Zookeeper chroot node")
self.out("Kafka chroot created in Zookeeper!")
def start(self):
# Configure Kafka child process
properties = self.tmp_dir.join("kafka.properties")
template = self.test_resource("kafka.properties")
args = self.kafka_run_class_args("kafka.Kafka", properties.strpath)
env = self.kafka_run_class_env()
timeout = 5
max_timeout = 120
backoff = 1
end_at = time.time() + max_timeout
tries = 1
auto_port = (self.port is None)
while time.time() < end_at:
# We have had problems with port conflicts on travis
# so we will try a different port on each retry
# unless the fixture was passed a specific port
if auto_port:
self.port = get_open_port()
self.out('Attempting to start on port %d (try #%d)' % (self.port, tries))
self.render_template(template, properties, vars(self))
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max(end_at - time.time(), 0))
if self.child.wait_for(self.start_pattern, timeout=timeout):
break
self.child.dump_logs()
self.child.stop()
timeout *= 2
time.sleep(backoff)
tries += 1
backoff += 1
else:
raise RuntimeError('Failed to start KafkaInstance before max_timeout')
(self._client,) = self.get_clients(1, '_internal_client')
self.out("Done!")
self.running = True
def open(self):
if self.running:
self.out("Instance already running")
return
# Create directories
if self.tmp_dir is None:
self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member
self.tmp_dir.ensure(dir=True)
self.tmp_dir.ensure('logs', dir=True)
self.tmp_dir.ensure('data', dir=True)
self.out("Running local instance...")
log.info(" host = %s", self.host)
开发者ID:kngenie,项目名称:kafka-python,代码行数:67,代码来源:fixtures.py
示例13: ZookeeperFixture
class ZookeeperFixture(Fixture):
@classmethod
def instance(cls):
if "ZOOKEEPER_URI" in os.environ:
parse = urlparse(os.environ["ZOOKEEPER_URI"])
(host, port) = (parse.hostname, parse.port)
fixture = ExternalService(host, port)
else:
(host, port) = ("127.0.0.1", None)
fixture = cls(host, port)
fixture.open()
return fixture
def __init__(self, host, port, tmp_dir=None):
super(ZookeeperFixture, self).__init__()
self.host = host
self.port = port
self.tmp_dir = tmp_dir
def kafka_run_class_env(self):
env = super(ZookeeperFixture, self).kafka_run_class_env()
env['LOG_DIR'] = self.tmp_dir.join('logs').strpath
return env
def out(self, message):
log.info("*** Zookeeper [%s:%s]: %s", self.host, self.port or '(auto)', message)
def open(self):
if self.tmp_dir is None:
self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member
self.tmp_dir.ensure(dir=True)
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port or '(auto)')
log.info(" tmp_dir = %s", self.tmp_dir.strpath)
# Configure Zookeeper child process
template = self.test_resource("zookeeper.properties")
properties = self.tmp_dir.join("zookeeper.properties")
args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain",
properties.strpath)
env = self.kafka_run_class_env()
# Party!
timeout = 5
max_timeout = 120
backoff = 1
end_at = time.time() + max_timeout
tries = 1
auto_port = (self.port is None)
while time.time() < end_at:
if auto_port:
self.port = get_open_port()
self.out('Attempting to start on port %d (try #%d)' % (self.port, tries))
self.render_template(template, properties, vars(self))
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max(end_at - time.time(), 0))
if self.child.wait_for(r"binding to port", timeout=timeout):
break
self.child.dump_logs()
self.child.stop()
timeout *= 2
time.sleep(backoff)
tries += 1
backoff += 1
else:
raise RuntimeError('Failed to start Zookeeper before max_timeout')
self.out("Done!")
atexit.register(self.close)
def close(self):
if self.child is None:
return
self.out("Stopping...")
self.child.stop()
self.child = None
self.out("Done!")
self.tmp_dir.remove()
def __del__(self):
self.close()
开发者ID:kngenie,项目名称:kafka-python,代码行数:85,代码来源:fixtures.py
注:本文中的test.service.SpawnedService类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论