本文整理汇总了Python中security.x509main.x509main函数的典型用法代码示例。如果您正苦于以下问题:Python x509main函数的具体用法?Python x509main怎么用?Python x509main使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了x509main函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_add_remove_autofailover
def test_add_remove_autofailover(self):
rest = RestConnection(self.master)
serv_out = self.servers[3]
shell = RemoteMachineShellConnection(serv_out)
known_nodes = ['[email protected]'+self.master.ip]
rest.create_bucket(bucket='default', ramQuotaMB=100)
rest.update_autofailover_settings(True,30)
x509main(self.master).setup_master()
x509main().setup_cluster_nodes_ssl(self.servers[1:4])
for server in self.servers[1:4]:
rest.add_node('Administrator','password',server.ip)
known_nodes.append('[email protected]'+server.ip)
rest.rebalance(known_nodes)
self.assertTrue(self.check_rebalance_complete(rest),"Issue with rebalance")
shell.stop_server()
self.sleep(60)
shell.start_server()
self.sleep(30)
for server in self.servers:
status = x509main(server)._validate_ssl_login()
self.assertEqual(status,200,"Not able to login via SSL code")
开发者ID:EricACooper,项目名称:testrunner,代码行数:25,代码来源:x509tests.py
示例2: test_get_cluster_ca
def test_get_cluster_ca(self):
x509main(self.master).setup_master()
status, content, header = x509main(self.master)._get_cluster_ca_cert()
content = json.loads(content)
self.assertEqual(content['cert']['type'],"uploaded","Type of certificate is mismatch")
#self.assertEqual(content['cert']['pem'],"uploaded","Type of certificate is mismatch")
self.assertEqual(content['cert']['subject'],"CN=Root Authority","Common Name is incorrect")
开发者ID:EricACooper,项目名称:testrunner,代码行数:7,代码来源:x509tests.py
示例3: test_add_node_without_cert
def test_add_node_without_cert(self):
rest = RestConnection(self.master)
servs_inout = self.servers[1]
x509main(self.master).setup_master()
try:
rest.add_node('Administrator','password',servs_inout.ip)
except Exception, ex:
ex = str(ex)
expected_result = "Error adding node: " + servs_inout.ip + " to the cluster:" + self.master.ip + " - [\"Prepare join failed. Error applying node certificate. Unable to read certificate chain file\"]"
#self.assertEqual(ex,expected_result)
self.assertTrue(expected_result in ex,"Incorrect Error message in exception")
开发者ID:chethanrao,项目名称:testrunner-archive,代码行数:11,代码来源:x509tests.py
示例4: _reset_original
def _reset_original(self):
self.log.info ("Reverting to original state - regenerating certificate and removing inbox folder")
tmp_path = "/tmp/abcd.pem"
for servers in self.servers:
cli_command = "ssl-manage"
remote_client = RemoteMachineShellConnection(servers)
options = "--regenerate-cert={0}".format(tmp_path)
output, error = remote_client.execute_couchbase_cli(cli_command=cli_command, options=options,
cluster_host=servers.ip, user="Administrator",
password="password")
x509main(servers)._delete_inbox_folder()
开发者ID:membase,项目名称:testrunner,代码行数:11,代码来源:x509tests.py
示例5: test_sdk_existing_cluster
def test_sdk_existing_cluster(self):
servers_in = self.servers[1:]
self.cluster.rebalance(self.servers, servers_in, [])
rest = RestConnection(self.master)
x509main(self.master).setup_master()
x509main().setup_cluster_nodes_ssl(self.servers,reload_cert=True)
rest.create_bucket(bucket='default', ramQuotaMB=100)
for server in self.servers:
result = self._sdk_connection(host_ip=server.ip)
self.assertTrue(result,"Cannot create a security connection with server")
开发者ID:EricACooper,项目名称:testrunner,代码行数:12,代码来源:x509tests.py
示例6: test_sdk_cluster_incorrect_cert
def test_sdk_cluster_incorrect_cert(self):
rest = RestConnection(self.master)
x509main(self.master).setup_master()
x509main().setup_cluster_nodes_ssl(self.servers)
rest.create_bucket(bucket='default', ramQuotaMB=100)
servers_in = self.servers[1:]
self.cluster.rebalance(self.servers, servers_in, [])
root_incorrect_ca_path = x509main.CACERTFILEPATH + x509main.INCORRECT_ROOT_CERT
for server in self.servers:
result = self._sdk_connection(host_ip=server.ip,root_ca_path=root_incorrect_ca_path)
self.assertFalse(result,"Can create a security connection with incorrect root cert")
开发者ID:EricACooper,项目名称:testrunner,代码行数:13,代码来源:x509tests.py
示例7: test_get_cluster_ca_cluster
def test_get_cluster_ca_cluster(self):
servs_inout = self.servers[1]
rest = RestConnection(self.master)
x509main(self.master).setup_master()
x509main(servs_inout)._setup_node_certificates(reload_cert=False)
servs_inout = self.servers[1]
rest.add_node('Administrator','password',servs_inout.ip)
for server in self.servers[:2]:
status, content, header = x509main(server)._get_cluster_ca_cert()
content = json.loads(content)
self.assertTrue(status,"Issue while Cluster CA Cert")
self.assertEqual(content['cert']['type'],"uploaded","Type of certificate is mismatch")
self.assertEqual(content['cert']['subject'],"CN=Root Authority","Common Name is incorrect")
开发者ID:chethanrao,项目名称:testrunner-archive,代码行数:13,代码来源:x509tests.py
示例8: test_add_node_with_cert
def test_add_node_with_cert(self):
servs_inout = self.servers[1:4]
rest = RestConnection(self.master)
x509main(self.master).setup_master()
x509main().setup_cluster_nodes_ssl(servs_inout)
known_nodes = ['[email protected]'+self.master.ip]
for server in servs_inout:
rest.add_node('Administrator','password',server.ip)
known_nodes.append('[email protected]' + server.ip)
rest.rebalance(known_nodes)
self.assertTrue(self.check_rebalance_complete(rest),"Issue with rebalance")
for server in self.servers:
status = x509main(server)._validate_ssl_login()
self.assertEqual(status,200,"Not able to login via SSL code")
开发者ID:EricACooper,项目名称:testrunner,代码行数:14,代码来源:x509tests.py
示例9: test_get_cluster_ca_self_signed
def test_get_cluster_ca_self_signed(self):
rest = RestConnection(self.master)
rest.regenerate_cluster_certificate()
status, content, header = x509main(self.master)._get_cluster_ca_cert()
content = json.loads(content)
self.assertTrue(status,"Issue while Cluster CA Cert")
self.assertEqual(content['cert']['type'],"generated","Type of certificate is mismatch")
开发者ID:EricACooper,项目名称:testrunner,代码行数:7,代码来源:x509tests.py
示例10: setUp
def setUp(self):
super(x509_upgrade, self).setUp()
self.initial_version = self.input.param("initial_version",'4.5.0-900')
self.upgrade_version = self.input.param("upgrade_version", "4.5.0-1069")
self._reset_original()
x509main(self.master)._generate_cert(self.servers)
self.ip_address = self.getLocalIPAddress()
enable_audit=self.input.param('audit',None)
if enable_audit:
Audit = audit(host=self.master)
currentState = Audit.getAuditStatus()
self.log.info ("Current status of audit on ip - {0} is {1}".format(self.master.ip, currentState))
if not currentState:
self.log.info ("Enabling Audit ")
Audit.setAuditEnable('true')
self.sleep(30)
开发者ID:EricACooper,项目名称:testrunner,代码行数:16,代码来源:x509tests.py
示例11: test_sdk_change_ca_self_signed
def test_sdk_change_ca_self_signed(self):
rest = RestConnection(self.master)
temp_file_name = '/tmp/newcerts/orig_cert.pem'
x509main(self.master).setup_master()
x509main().setup_cluster_nodes_ssl(self.servers)
rest.create_bucket(bucket='default', ramQuotaMB=100)
result = self._sdk_connection(host_ip=self.master.ip)
self.assertTrue(result,"Cannot create a security connection with server")
rest.regenerate_cluster_certificate()
temp_cert = rest.get_cluster_ceritificate()
temp_file = open(temp_file_name,'w')
temp_file.write(temp_cert)
temp_file.close()
result = self._sdk_connection(root_ca_path=temp_file_name,host_ip=self.master.ip)
self.assertTrue(result,"Cannot create a security connection with server")
开发者ID:EricACooper,项目名称:testrunner,代码行数:17,代码来源:x509tests.py
示例12: test_end_to_end_after_cluster
def test_end_to_end_after_cluster(self):
output, error = self._upload_cert_cli()
self._setup_all_cluster_nodes(self.servers)
servers_in = self.servers[1:]
self.cluster.rebalance(self.servers, servers_in, [])
for server in self.servers:
status = x509main(server)._validate_ssl_login()
self.assertEqual(status,200,"Not able to login via SSL code")
开发者ID:arod1987,项目名称:testrunner,代码行数:8,代码来源:x509clitest.py
示例13: upgrade_all_nodes
def upgrade_all_nodes(self):
servers_in = self.servers[1:]
self._install(self.servers)
rest_conn = RestConnection(self.master)
rest_conn.init_cluster(username='Administrator', password='password')
rest_conn.create_bucket(bucket='default', ramQuotaMB=512)
self.cluster.rebalance(self.servers, servers_in, [])
upgrade_threads = self._async_update(upgrade_version=self.upgrade_version, servers=self.servers)
for threads in upgrade_threads:
threads.join()
x509main(self.master).setup_master()
x509main().setup_cluster_nodes_ssl(self.servers,reload_cert=True)
for server in self.servers:
result = self._sdk_connection(host_ip=server.ip)
self.assertTrue(result,"Cannot create a security connection with server")
开发者ID:EricACooper,项目名称:testrunner,代码行数:19,代码来源:x509tests.py
示例14: test_add_remove_graceful_add_back_node_with_cert
def test_add_remove_graceful_add_back_node_with_cert(self,recovery_type=None):
recovery_type = self.input.param('recovery_type')
rest = RestConnection(self.master)
known_nodes = ['[email protected]'+self.master.ip]
progress = None
count = 0
servs_inout = self.servers[1:]
serv_out = '[email protected]' + servs_inout[1].ip
rest.create_bucket(bucket='default', ramQuotaMB=100)
x509main(self.master).setup_master()
x509main().setup_cluster_nodes_ssl(servs_inout)
for server in servs_inout:
rest.add_node('Administrator','password',server.ip)
known_nodes.append('[email protected]' + server.ip)
rest.rebalance(known_nodes)
self.assertTrue(self.check_rebalance_complete(rest),"Issue with rebalance")
for server in servs_inout:
status = x509main(server)._validate_ssl_login()
self.assertEqual(status,200,"Not able to login via SSL code")
rest.fail_over(serv_out,graceful=True)
self.assertTrue(self.check_rebalance_complete(rest),"Issue with rebalance")
rest.set_recovery_type(serv_out,recovery_type)
rest.add_back_node(serv_out)
rest.rebalance(known_nodes)
self.assertTrue(self.check_rebalance_complete(rest),"Issue with rebalance")
for server in servs_inout:
status = x509main(server)._validate_ssl_login()
self.assertEqual(status,200,"Not able to login via SSL code")
开发者ID:EricACooper,项目名称:testrunner,代码行数:34,代码来源:x509tests.py
示例15: test_add_remove_add_back_node_with_cert
def test_add_remove_add_back_node_with_cert(self,rebalance=None):
rebalance = self.input.param('rebalance')
rest = RestConnection(self.master)
servs_inout = self.servers[1:3]
serv_out = '[email protected]' + servs_inout[1].ip
known_nodes = ['[email protected]'+self.master.ip]
x509main(self.master).setup_master()
x509main().setup_cluster_nodes_ssl(servs_inout)
for server in servs_inout:
rest.add_node('Administrator','password',server.ip)
known_nodes.append('[email protected]' + server.ip)
rest.rebalance(known_nodes)
self.assertTrue(self.check_rebalance_complete(rest),"Issue with rebalance")
for server in servs_inout:
status = x509main(server)._validate_ssl_login()
self.assertEqual(status,200,"Not able to login via SSL code")
rest.fail_over(serv_out,graceful=False)
if (rebalance):
rest.rebalance(known_nodes,[serv_out])
self.assertTrue(self.check_rebalance_complete(rest),"Issue with rebalance")
rest.add_node('Administrator','password',servs_inout[1].ip)
else:
rest.add_back_node(serv_out)
rest.rebalance(known_nodes)
self.assertTrue(self.check_rebalance_complete(rest),"Issue with rebalance")
for server in servs_inout:
response = x509main(server)._validate_ssl_login()
self.assertEqual(status,200,"Not able to login via SSL code")
开发者ID:EricACooper,项目名称:testrunner,代码行数:28,代码来源:x509tests.py
示例16: test_basic_xdcr_with_cert
def test_basic_xdcr_with_cert(self):
cluster1 = self.servers[0:2]
cluster2 = self.servers[2:4]
remote_cluster_name = 'sslcluster'
restCluster1 = RestConnection(cluster1[0])
restCluster2 = RestConnection(cluster2[0])
try:
#Setup cluster1
x509main(cluster1[0]).setup_master()
x509main(cluster1[1])._setup_node_certificates(reload_cert=False)
restCluster1.add_node('Administrator','password',cluster1[1].ip)
known_nodes = ['[email protected]'+cluster1[0].ip,'[email protected]' + cluster1[1].ip]
restCluster1.rebalance(known_nodes)
self.assertTrue(self.check_rebalance_complete(restCluster1),"Issue with rebalance")
restCluster1.create_bucket(bucket='default', ramQuotaMB=100)
restCluster1.remove_all_replications()
restCluster1.remove_all_remote_clusters()
#Setup cluster2
x509main(cluster2[0]).setup_master()
x509main(cluster2[1])._setup_node_certificates(reload_cert=False)
restCluster2.add_node('Administrator','password',cluster2[1].ip)
known_nodes = ['[email protected]'+cluster2[0].ip,'[email protected]' + cluster2[1].ip]
restCluster2.rebalance(known_nodes)
self.assertTrue(self.check_rebalance_complete(restCluster2),"Issue with rebalance")
restCluster2.create_bucket(bucket='default', ramQuotaMB=100)
test = x509main.CACERTFILEPATH + x509main.CACERTFILE
data = open(test, 'rb').read()
restCluster1.add_remote_cluster(cluster2[0].ip,cluster2[0].port,'Administrator','password',remote_cluster_name,certificate=data)
replication_id = restCluster1.start_replication('continuous','default',remote_cluster_name)
if replication_id is not None:
self.assertTrue(True,"Replication was not created successfully")
finally:
known_nodes = ['[email protected]'+cluster2[0].ip,'[email protected]'+cluster2[1].ip]
restCluster2.rebalance(known_nodes,['[email protected]' + cluster2[1].ip])
self.assertTrue(self.check_rebalance_complete(restCluster2),"Issue with rebalance")
restCluster2.delete_bucket()
开发者ID:EricACooper,项目名称:testrunner,代码行数:42,代码来源:x509tests.py
示例17: _setup_cluster_nodes
def _setup_cluster_nodes(self,host):
x509main(host)._create_inbox_folder(self.master)
src_chain_file = x509main.CACERTFILEPATH + "long_chain" + host.ip + ".pem"
print src_chain_file
dest_chain_file = self.install_path + x509main.CHAINFILEPATH + "/" + x509main.CHAINCERTFILE
print dest_chain_file
src_node_key = x509main.CACERTFILEPATH + host.ip + ".key"
print src_node_key
dest_node_key = self.install_path + x509main.CHAINFILEPATH + "/" + x509main.NODECAKEYFILE
print dest_node_key
x509main(host)._copy_node_key_chain_cert(host, src_chain_file, dest_chain_file)
x509main(host)._copy_node_key_chain_cert(host, src_node_key, dest_node_key)
cli_command = 'ssl-manage'
options = "--set-node-certificate"
remote_client = RemoteMachineShellConnection(host)
output, error = remote_client.execute_couchbase_cli(cli_command=cli_command, \
options=options, cluster_host="localhost", user=self.ldapUser, password=self.ldapPass)
return output, error
开发者ID:arod1987,项目名称:testrunner,代码行数:18,代码来源:x509clitest.py
示例18: _reset_original
def _reset_original(self):
self.log.info ("Reverting to original state - regenerating certificate and removing inbox folder")
for servers in self.servers:
rest = RestConnection(servers)
rest.regenerate_cluster_certificate()
x509main(servers)._delete_inbox_folder()
开发者ID:EricACooper,项目名称:testrunner,代码行数:6,代码来源:x509tests.py
示例19: setUp
def setUp(self):
super(X509clitest, self).setUp()
self.ldapUser = self.input.param('ldapuser','Administrator')
self.ldapPass = self.input.param('ldappass','password')
self.install_path = x509main()._get_install_path(self.master)
self.slave_host = ServerInfo('127.0.0.1', 22, 'root', 'couchbase')
开发者ID:arod1987,项目名称:testrunner,代码行数:6,代码来源:x509clitest.py
示例20: test_audit_reload_ca
def test_audit_reload_ca(self):
x509main(self.master).setup_master()
expectedResults = {"expires":"2049-12-31T23:59:59.000Z","subject":"CN="+self.master.ip,"ip":self.ip_address, "port":57457,"source":"ns_server", \
"user":"Administrator"}
self.checkConfig(8230, self.master, expectedResults)
开发者ID:EricACooper,项目名称:testrunner,代码行数:5,代码来源:x509tests.py
注:本文中的security.x509main.x509main函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论