本文整理汇总了Python中security.auditmain.audit函数的典型用法代码示例。如果您正苦于以下问题:Python audit函数的具体用法?Python audit怎么用?Python audit使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了audit函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_rotateIntervalCluster
def test_rotateIntervalCluster(self):
intervalSec = self.input.param("intervalSec", None)
nodes_init = self.input.param("nodes_init", 2)
auditIns = audit(host=self.master)
auditIns.setAuditEnable('true')
originalInt = auditIns.getAuditRotateInterval()
auditIns.setAuditRotateInterval(intervalSec)
firstEventTime = []
try:
for i in range(len(self.servers[:nodes_init])):
auditTemp = audit(host=self.servers[i])
firstEventTime.append(self.getTimeStampForFile(auditTemp))
self.sleep(intervalSec + 20, 'Sleep for log roll over to happen')
for i in range(len(self.servers[:nodes_init])):
shell = RemoteMachineShellConnection(self.servers[i])
rest = RestConnection(self.servers[i])
status, content = rest.validateLogin(self.master.rest_username, self.master.rest_password, True, getContent=True)
self.sleep(120, "sleeping for log file creation")
try:
hostname = shell.execute_command("hostname")
self.log.info ("print firstEventTime {0}".format(firstEventTime[i]))
archiveFile = hostname[0][0] + '-' + firstEventTime[i] + "-audit.log"
self.log.info ("Archive File Name is {0}".format(archiveFile))
result = shell.file_exists(auditIns.pathLogFile, archiveFile)
self.assertTrue(result, "Archive Audit.log is not created on time interval")
self.log.info ("Validation of archive File created is True, Audit archive File is created {0}".format(archiveFile))
result = shell.file_exists(auditIns.pathLogFile, auditIns.AUDITLOGFILENAME)
self.assertTrue(result, "Audit.log is not created as per the roll over time specified")
finally:
shell.disconnect()
finally:
auditIns.setAuditRotateInterval(originalInt)
开发者ID:arod1987,项目名称:testrunner,代码行数:35,代码来源:auditcheckconfig.py
示例2: test_AuditEvent
def test_AuditEvent(self):
auditIns = audit(host=self.master)
ops = self.input.param("ops", None)
source = 'internal'
user = 'couchbase'
rest = RestConnection(self.master)
#status = rest.setAuditSettings(enabled='true')
auditIns.setAuditEnable('true')
if (ops in ['enable', 'disable']):
if ops == 'disable':
#status = rest.setAuditSettings(enabled='false')
auditIns.setAuditEnable('false')
else:
#status = rest.setAuditSettings(enabled='true')
auditIns.setAuditEnable('true')
if ops == 'disable':
shell = RemoteMachineShellConnection(self.master)
try:
result = shell.file_exists(auditIns.getAuditLogPath(), auditIns.AUDITLOGFILENAME)
finally:
shell.disconnect()
self.assertFalse(result, 'Issue with file getting create in new directory')
else:
auditIns = audit(host=self.master)
expectedResults = {"auditd_enabled":auditIns.getAuditStatus(),
"descriptors_path":self.changePathWindows(auditIns.getAuditConfigElement('descriptors_path')),
"log_path":self.changePathWindows((auditIns.getAuditLogPath())[:-1]), "source":"internal",
"user":"couchbase", "rotate_interval":86400, "version":1, 'hostname':self.getHostName(self.master)}
self.checkConfig(self.AUDITCONFIGRELOAD, self.master, expectedResults)
开发者ID:arod1987,项目名称:testrunner,代码行数:30,代码来源:auditcheckconfig.py
示例3: test_eventDisabled
def test_eventDisabled(self):
disableEvent = self.input.param("disableEvent", None)
Audit = audit(host=self.master)
temp = Audit.getAuditConfigElement('all')
temp['disabled'] = [disableEvent]
Audit.writeFile(lines=temp)
rest = RestConnection(self.master)
rest.update_autofailover_settings(True, 120)
auditIns = audit(eventID=disableEvent, host=self.master)
status = auditIns.checkLastEvent()
self.assertFalse(status, "Event still getting printed after getting disabled")
开发者ID:arod1987,项目名称:testrunner,代码行数:11,代码来源:auditcheckconfig.py
示例4: test_createBucketClusterNodeOut
def test_createBucketClusterNodeOut(self):
ops = self.input.param("ops", None)
nodesOut = self.input.param("nodes_out", 1)
source = 'ns_server'
user = self.master.rest_username
firstNode = self.servers[0]
secondNode = self.servers[1]
auditFirstNode = audit(host=firstNode)
auditFirstNode.setAuditEnable('true')
auditSecondNode = audit(host=secondNode)
origState = auditFirstNode.getAuditStatus()
origLogPath = auditFirstNode.getAuditLogPath()
origRotateInterval = auditFirstNode.getAuditRotateInterval()
#Remove the node from cluster & check if there are any change to cluster
self.cluster.rebalance(self.servers, [], self.servers[1:nodesOut + 1])
self.assertEqual(auditFirstNode.getAuditStatus(), origState, "Issues with audit state after removing node")
self.assertEqual(auditFirstNode.getAuditLogPath(), origLogPath, "Issues with audit log path after removing node")
self.assertEqual(auditFirstNode.getAuditRotateInterval(), origRotateInterval, "Issues with audit rotate interval after removing node")
restFirstNode = RestConnection(firstNode)
if (ops in ['create']):
expectedResults = {'bucket_name':'TestBucketRemNode', 'ram_quota':104857600, 'num_replicas':0,
'replica_index':False, 'eviction_policy':'value_only', 'type':'membase', \
'auth_type':'sasl', "autocompaction":'false', "purge_interval":"undefined", \
"flush_enabled":False, "num_threads":3, "source":source, \
"user":user, "ip":self.ipAddress, "port":57457, 'sessionid':'' }
restFirstNode.create_bucket(expectedResults['bucket_name'], expectedResults['ram_quota'] / 1048576, expectedResults['auth_type'], 'password', expectedResults['num_replicas'], \
'11211', 'membase', 0, expectedResults['num_threads'], 0, 'valueOnly')
self.checkConfig(self.eventID, firstNode, expectedResults)
#Add back the Node in Cluster
self.cluster.rebalance(self.servers, self.servers[1:nodesOut + 1], [])
self.assertEqual(auditSecondNode.getAuditStatus(), origState, "Issues with audit state after adding node")
self.assertEqual(auditSecondNode.getAuditLogPath(), origLogPath, "Issues with audit log path after adding node")
self.assertEqual(auditSecondNode.getAuditRotateInterval(), origRotateInterval, "Issues with audit rotate interval after adding node")
for server in self.servers:
user = server.rest_username
rest = RestConnection(server)
if (ops in ['create']):
expectedResults = {'bucket_name':'TestBucket' + server.ip, 'ram_quota':104857600, 'num_replicas':1,
'replica_index':False, 'eviction_policy':'value_only', 'type':'membase', \
'auth_type':'sasl', "autocompaction":'false', "purge_interval":"undefined", \
"flush_enabled":False, "num_threads":3, "source":source, \
"user":user, "ip":self.ipAddress, "port":57457, 'sessionid':'' }
rest.create_bucket(expectedResults['bucket_name'], expectedResults['ram_quota'] / 1048576, expectedResults['auth_type'], 'password', expectedResults['num_replicas'], \
'11211', 'membase', 0, expectedResults['num_threads'], 0 , 'valueOnly')
self.checkConfig(self.eventID, server, expectedResults)
开发者ID:pkdevboxy,项目名称:testrunner,代码行数:53,代码来源:audittest.py
示例5: test_audit_rm_node
def test_audit_rm_node(self):
eventID = 8197 # add node
server = self.master
if self.input.tuq_client and "client" in self.input.tuq_client:
server = self.input.tuq_client["client"]
index_field = self.input.param("index_field", "job_title")
indexes = []
try:
audit_reb_out = audit(eventID=eventID, host=server)
indexes = self._create_multiple_indexes(index_field)
servers_in = self.servers[1 : self.nodes_in]
self.cluster.rebalance(self.servers[:1], servers_in, [], services=self.services_in)
rebalance = self.cluster.rebalance(self.servers[:1], [], servers_in)
expected_result = {
"services": self.services_in,
"port": 8091,
"hostname": servers_in[0].ip,
"groupUUID": "0",
"node": "[email protected]" + servers_in[0].ip,
"source": "ns_server",
"user": self.master.rest_username,
"ip": self.getLocalIPAddress(),
"port": 57457,
}
self.test_min()
audit_reb_out.checkConfig(expected_result)
rebalance.result()
finally:
for bucket in self.buckets:
for index_name in set(indexes):
self.run_cbq_query(query="DROP INDEX %s.%s" % (bucket.name, index_name))
开发者ID:pkdevboxy,项目名称:testrunner,代码行数:31,代码来源:tuq_cluster_ops.py
示例6: test_rotateInterval
def test_rotateInterval(self):
intervalSec = self.input.param("intervalSec", None)
auditIns = audit(host=self.master)
rest = RestConnection(self.master)
originalInt = auditIns.getAuditRotateInterval()
try:
firstEventTime = self.getTimeStampForFile(auditIns)
self.log.info ("first time evetn is {0}".format(firstEventTime))
auditIns.setAuditRotateInterval(intervalSec)
self.sleep(intervalSec + 20, 'Sleep for log roll over to happen')
status, content = rest.validateLogin(self.master.rest_username, self.master.rest_password, True, getContent=True)
self.sleep(120)
shell = RemoteMachineShellConnection(self.master)
try:
hostname = shell.execute_command("hostname")
archiveFile = hostname[0][0] + '-' + firstEventTime + "-audit.log"
self.log.info ("Archive File Name is {0}".format(archiveFile))
result = shell.file_exists(auditIns.pathLogFile, archiveFile)
self.assertTrue(result, "Archive Audit.log is not created on time interval")
self.log.info ("Validation of archive File created is True, Audit archive File is created {0}".format(archiveFile))
result = shell.file_exists(auditIns.pathLogFile, auditIns.AUDITLOGFILENAME)
self.assertTrue(result, "Audit.log is not created when memcached server is killed")
finally:
shell.disconnect()
finally:
auditIns.setAuditRotateInterval(originalInt)
开发者ID:arod1987,项目名称:testrunner,代码行数:26,代码来源:auditcheckconfig.py
示例7: test_folderMisMatchCluster
def test_folderMisMatchCluster(self):
auditIns = audit(host=self.master)
orginalPath = auditIns.getAuditLogPath()
newPath = originalPath + 'testFolderMisMatch'
shell = RemoteMachineShellConnection(self.servers[0])
try:
shell.create_directory(newPath)
command = 'chown couchbase:couchbase ' + newPath
shell.execute_command(command)
finally:
shell.disconnect()
auditIns.setsetAuditLogPath(newPath)
for server in self.servers:
rest = RestConnection(sever)
#Create an Event for Bucket Creation
expectedResults = {'name':'TestBucket ' + server.ip, 'ram_quota':536870912, 'num_replicas':1,
'replica_index':False, 'eviction_policy':'value_only', 'type':'membase', \
'auth_type':'sasl', "autocompaction":'false', "purge_interval":"undefined", \
"flush_enabled":False, "num_threads":3, "source":source, \
"user":user, "ip":self.ipAddress, "port":57457, 'sessionid':'' }
rest.create_bucket(expectedResults['name'], expectedResults['ram_quota'] / 1048576, expectedResults['auth_type'], 'password', expectedResults['num_replicas'], \
'11211', 'membase', 0, expectedResults['num_threads'], expectedResults['flush_enabled'], 'valueOnly')
#Check on Events
try:
self.checkConfig(self.eventID, self.servers[0], expectedResults)
except:
self.log.info ("Issue reading the file at Node {0}".format(server.ip))
开发者ID:arod1987,项目名称:testrunner,代码行数:30,代码来源:auditcheckconfig.py
示例8: test_fileRotate20MB
def test_fileRotate20MB(self):
auditIns = audit(host=self.master)
firstEventTime = self.getTimeStampForFile(auditIns)
tempEventCounter = 0
rest = RestConnection(self.master)
shell = RemoteMachineShellConnection(self.master)
filePath = auditIns.pathLogFile + auditIns.AUDITLOGFILENAME
number = int (shell.get_data_file_size(filePath))
hostname = shell.execute_command("hostname")
archiveFile = hostname[0][0] + '-' + firstEventTime + "-audit.log"
result = shell.file_exists(auditIns.pathLogFile, archiveFile)
tempTime = 0
starttime = time.time()
while ((number < 21089520) and (tempTime < 36000) and (result == False)):
for i in range(1, 10):
status, content = rest.validateLogin("Administrator", "password", True, getContent=True)
tempEventCounter += 1
number = int (shell.get_data_file_size(filePath))
currTime = time.time()
tempTime = int (currTime - starttime)
result = shell.file_exists(auditIns.pathLogFile, archiveFile)
self.sleep(30)
result = shell.file_exists(auditIns.pathLogFile, archiveFile)
shell.disconnect()
self.log.info ("--------Total Event Created ---- {0}".format(tempEventCounter))
self.assertTrue(result, "Archive Audit.log is not created on reaching 20MB threshhold")
开发者ID:arod1987,项目名称:testrunner,代码行数:26,代码来源:auditcheckconfig.py
示例9: test_role_assignment_audit
def test_role_assignment_audit(self):
ops = self.input.param("ops",'assign')
if ops in ['assign','edit']:
eventID=rbacmain.AUDIT_ROLE_ASSIGN
elif ops == 'remove':
eventID=rbacmain.AUDIT_REMOVE_ROLE
Audit = audit(eventID=eventID, host=self.master)
currentState = Audit.getAuditStatus()
self.log.info ("Current status of audit on ip - {0} is {1}".format(self.master.ip, currentState))
if currentState:
Audit.setAuditEnable('false')
self.log.info ("Enabling Audit ")
Audit.setAuditEnable('true')
self.sleep(30)
user_name = self.input.param("user_name")
final_roles = rbacmain()._return_roles(self.user_role)
payload = "name=" + user_name + "&roles=" + final_roles
status, content, header = rbacmain(self.master)._set_user_roles(user_name=self.user_id,payload=payload)
expectedResults = {"full_name":"RitamSharma","roles":["admin"],"identity:source":"saslauthd","identity:user":self.user_id,
"real_userid:source":"ns_server","real_userid:user":"Administrator",
"ip":self.ipAddress, "port":123456}
if ops == 'edit':
payload = "name=" + user_name + "&roles=" + 'admin,cluster_admin'
status, content, header = rbacmain(self.master)._set_user_roles(user_name=self.user_id,payload=payload)
expectedResults = {"full_name":"RitamSharma","roles":["admin","cluster_admin"],"identity:source":"saslauthd","identity:user":self.user_id,
"real_userid:source":"ns_server","real_userid:user":"Administrator",
"ip":self.ipAddress, "port":123456}
elif ops == 'remove':
status, content, header = rbacmain(self.master)._delete_user(self.user_id)
expectedResults = {"identity:source":"saslauthd","identity:user":self.user_id,
"real_userid:source":"ns_server","real_userid:user":"Administrator",
"ip":self.ipAddress, "port":123456}
fieldVerification, valueVerification = Audit.validateEvents(expectedResults)
self.assertTrue(fieldVerification, "One of the fields is not matching")
self.assertTrue(valueVerification, "Values for one of the fields is not matching")
开发者ID:chethanrao,项目名称:testrunner-archive,代码行数:35,代码来源:rbacTest.py
示例10: test_invalidLogPathCluster
def test_invalidLogPathCluster(self):
auditIns = audit(host=self.master)
newPath = auditIns.getAuditLogPath() + 'test'
rest = RestConnection(self.master)
status, content = rest.setAuditSettings(logPath=newPath)
self.assertFalse(status, "Audit is able to set invalid path")
self.assertEqual(content['errors']['logPath'], 'The value must be a valid directory', 'No error or error changed')
开发者ID:arod1987,项目名称:testrunner,代码行数:7,代码来源:auditcheckconfig.py
示例11: test_rotateIntervalShort
def test_rotateIntervalShort(self):
intervalSec = self.input.param("intervalSec", None)
auditIns = audit(host=self.master)
auditIns.setAuditRotateInterval(intervalSec)
originalInt = auditIns.getAuditRotateInterval()
status, content = auditIns.setAuditRotateInterval(intervalSec)
self.assertFalse(status, "Audit log interval setting is <900 or > 604800")
self.assertEqual(content['errors']['rotateInterval'], 'The value of rotateInterval must be in range from 15 minutes to 7 days')
开发者ID:arod1987,项目名称:testrunner,代码行数:8,代码来源:auditcheckconfig.py
示例12: test_changeLogPath
def test_changeLogPath(self):
nodes_init = self.input.param("nodes_init", 0)
auditMaster = audit(host=self.servers[0])
auditSecNode = audit(host=self.servers[1])
#Capture original Audit Log Path
originalPath = auditMaster.getAuditLogPath()
#Create folders on CB server machines and change permission
try:
newPath = auditMaster.getAuditLogPath() + "folder"
for server in self.servers[:nodes_init]:
shell = RemoteMachineShellConnection(server)
try:
shell.create_directory(newPath)
command = 'chown couchbase:couchbase ' + newPath
shell.execute_command(command)
finally:
shell.disconnect()
source = 'ns_server'
user = self.master.rest_username
auditMaster.setAuditLogPath(newPath)
#Create an event of Updating autofailover settings
for server in self.servers[:nodes_init]:
rest = RestConnection(server)
expectedResults = {'max_nodes':1, "timeout":120, 'source':source, "user":user, 'ip':self.ipAddress, 'port':12345}
rest.update_autofailover_settings(True, expectedResults['timeout'])
self.sleep(120, 'Waiting for new audit file to get created')
#check for audit.log remotely
shell = RemoteMachineShellConnection(server)
try:
result = shell.file_exists(newPath, auditMaster.AUDITLOGFILENAME)
finally:
shell.disconnect()
if (result is False):
self.assertTrue(result, 'Issue with file getting create in new directory')
finally:
auditMaster.setAuditLogPath(originalPath)
开发者ID:arod1987,项目名称:testrunner,代码行数:43,代码来源:auditcheckconfig.py
示例13: getDefaultState
def getDefaultState(self):
auditIns = audit(host=self.master)
#Validate that status of enabled is false
tempStatus = auditIns.getAuditStatus()
if (tempStatus == 'true'):
tempStatus = True
else:
tempStatus = False
self.assertFalse(tempStatus, "Audit is not disabled by default")
开发者ID:arod1987,项目名称:testrunner,代码行数:10,代码来源:auditcheckconfig.py
示例14: test_clusterEndToEnd
def test_clusterEndToEnd(self):
rest = []
for i in range(0, len(self.servers)):
node1 = self.servers[i]
restNode1 = RestConnection(node1)
rest.append(restNode1)
auditNodeFirst = audit(host=self.servers[0])
auditNodeSec = audit (host=self.servers[1])
origLogPath = auditNodeFirst.getAuditLogPath()
try:
# Create Events on both the nodes
for server in self.servers:
rest = RestConnection(sever)
#Create an Event for Bucket Creation
expectedResults = self.createBucketAudit(server, "TestBucket" + server.ip)
self.checkConfig(self.eventID, server, expectedResults)
#Remove one node from the cluser
self.cluster.rebalance(self.server, [], self.servers[1:self.nodes_out + 1])
#Change path on first cluster + Create Bucket Event
newPath = auditNodeFirst.getAuditLogPath() + "changeClusterLogPath"
self.createRemoteFolder(self.server[0], newPath)
auditNodeFirst.setAuditLogPath(newPath)
expectedResults = self.createBucketAudit(self.server[0], "TestBucketFirstNode")
self.checkConfig(self.eventID, self.servers[0], expectedResults)
#Add one node to the cluster
self.createRemoteFolder(self.server[1], newPath)
self.cluster.rebalance(self.server, self.servers[1:self.nodes_out + 1], [])
expectedResults = self.createBucketAudit(self.server[1], "TestBucketSecondNode")
self.checkConfig(self.eventID, self.servers[1], expectedResults)
#Change path on first cluster + Create Bucket Event
auditNodeFirst.setAuditLogPath(origLogPath)
expectedResults = self.createBucketAudit(self.server[0], "TestBucketFirstNode")
self.checkConfig(self.eventID, self.servers[0], expectedResults)
except:
auditNodeFirst.setAuditLogPath(origLogPath)
开发者ID:arod1987,项目名称:testrunner,代码行数:43,代码来源:auditcheckconfig.py
示例15: setUp
def setUp(self):
super(auditTest, self).setUp()
self.ipAddress = self.getLocalIPAddress()
self.eventID = self.input.param('id', None)
auditTemp = audit(host=self.master)
currentState = auditTemp.getAuditStatus()
self.log.info ("Current status of audit on ip - {0} is {1}".format(self.master.ip, currentState))
if not currentState:
self.log.info ("Enabling Audit ")
auditTemp.setAuditEnable('true')
self.sleep(30)
开发者ID:pkdevboxy,项目名称:testrunner,代码行数:11,代码来源:audittest.py
示例16: checkConfig
def checkConfig(self, eventID, host, expectedResults):
Audit = audit(eventID=eventID, host=host)
currentState = Audit.getAuditStatus()
self.log.info ("Current status of audit on ip - {0} is {1}".format(self.master.ip, currentState))
if not currentState:
self.log.info ("Enabling Audit ")
Audit.setAuditEnable('true')
self.sleep(30)
fieldVerification, valueVerification = Audit.validateEvents(expectedResults)
self.assertTrue(fieldVerification, "One of the fields is not matching")
self.assertTrue(valueVerification, "Values for one of the fields is not matching")
开发者ID:EricACooper,项目名称:testrunner,代码行数:11,代码来源:x509tests.py
示例17: test_AuditEvent
def test_AuditEvent(self):
auditIns = audit(host=self.master)
ops = self.input.param("ops", None)
source = 'internal'
user = 'couchbase'
rest = RestConnection(self.master)
#status = rest.setAuditSettings(enabled='true')
auditIns.setAuditEnable('true')
if (ops in ['enable', 'disable']):
if ops == 'disable':
#status = rest.setAuditSettings(enabled='false')
auditIns.setAuditEnable('false')
else:
#status = rest.setAuditSettings(enabled='true')
auditIns.setAuditEnable('true')
auditIns = audit(host=self.master)
expectedResults = {"auditd_enabled":auditIns.getAuditStatus(),
"descriptors_path":auditIns.getAuditConfigElement('descriptors_path'),
"log_path":(auditIns.getAuditLogPath())[:-1], "source":"internal",
"user":"couchbase", "rotate_interval":86400, "version":1, 'hostname':self.getHostName(self.master)}
self.checkConfig(self.AUDITCONFIGRELOAD, self.master, expectedResults)
开发者ID:lichia,项目名称:testrunner,代码行数:22,代码来源:auditcheckconfig.py
示例18: test_enableStatusCluster
def test_enableStatusCluster(self):
nodes_init = self.input.param("nodes_init", 2)
auditIns = audit(host=self.master)
origState = auditIns.getAuditStatus()
auditIns.setAuditEnable('true')
try:
for i in range(len(self.servers[:nodes_init])):
auditTemp = audit(host=self.servers[i])
tempStatus = auditTemp.getAuditStatus()
self.log.info ("value of current status is {0} on ip -{1}".format(tempStatus, self.servers[i].ip))
self.assertTrue(tempStatus, "Audit is not enabled across the cluster")
auditTemp = audit(host=self.servers[1])
auditTemp.setAuditEnable('false')
for i in range(len(self.servers[:nodes_init])):
auditTemp = audit(host=self.servers[i])
tempStatus = auditTemp.getAuditStatus()
self.log.info ("value of current status is {0} on ip -{1}".format(tempStatus, self.servers[i].ip))
self.assertFalse(tempStatus, "Audit is not enabled across the cluster")
finally:
auditIns.setAuditEnable(self.returnBoolVal(origState))
开发者ID:arod1987,项目名称:testrunner,代码行数:24,代码来源:auditcheckconfig.py
示例19: run_test_queryEvents
def run_test_queryEvents(self):
# required for local testing: uncomment below
self.ipAddress = self.master.ip
#self.ipAddress = self.getLocalIPAddress()
# self.eventID = self.input.param('id', None)
auditTemp = audit(host=self.master)
currentState = auditTemp.getAuditStatus()
self.log.info("Current status of audit on ip - {0} is {1}".format(self.master.ip, currentState))
if not currentState:
self.log.info("Enabling Audit ")
auditTemp.setAuditEnable('true')
self.sleep(30)
rest = RestConnection(self.master)
self.setupLDAPSettings(rest)
query_type = self.op_type
user = self.master.rest_username
source = 'ns_server'
if query_type == 'select':
if self.filter:
self.execute_filtered_query()
self.run_cbq_query(server=self.master, query="SELECT * FROM default LIMIT 100")
expectedResults = {'node':'%s:%s' % (self.master.ip, self.master.port), 'status': 'success', 'isAdHoc': True,
'name': 'SELECT statement', 'real_userid': {'source': source, 'user': user},
'statement': 'SELECT * FROM default LIMIT 100',
'userAgent': 'Python-httplib2/$Rev: 259 $', 'id': self.eventID,
'description': 'A N1QL SELECT statement was executed'}
elif query_type == 'insert':
if self.filter:
self.execute_filtered_query()
self.run_cbq_query(server=self.master, query='INSERT INTO default ( KEY, VALUE ) VALUES ("1",{ "order_id": "1", "type": '
'"order", "customer_id":"24601", "total_price": 30.3, "lineitems": '
'[ "11", "12", "13" ] })')
expectedResults = {'node': '%s:%s' % (self.master.ip, self.master.port), 'status': 'success', 'isAdHoc': True,
'name': 'INSERT statement', 'real_userid': {'source': source, 'user': user},
'statement': 'INSERT INTO default ( KEY, VALUE ) VALUES ("1",{ "order_id": "1", "type": '
'"order", "customer_id":"24601", "total_price": 30.3, "lineitems": '
'[ "11", "12", "13" ] })',
'userAgent': 'Python-httplib2/$Rev: 259 $', 'id': self.eventID,
'description': 'A N1QL INSERT statement was executed'}
if query_type == 'delete':
self.checkConfig(self.eventID, self.servers[1], expectedResults, n1ql_audit=True)
if self.filter:
self.checkFilter(self.unauditedID, self.servers[1])
else:
self.checkConfig(self.eventID, self.master, expectedResults, n1ql_audit=True)
if self.filter:
self.checkFilter(self.unauditedID, self.master)
开发者ID:ritamcouchbase,项目名称:testrunner,代码行数:48,代码来源:n1ql_upgrade.py
示例20: setUp
def setUp(self):
super(x509_upgrade, self).setUp()
self.initial_version = self.input.param("initial_version",'4.5.0-900')
self.upgrade_version = self.input.param("upgrade_version", "4.5.0-1069")
self._reset_original()
x509main(self.master)._generate_cert(self.servers)
self.ip_address = self.getLocalIPAddress()
enable_audit=self.input.param('audit',None)
if enable_audit:
Audit = audit(host=self.master)
currentState = Audit.getAuditStatus()
self.log.info ("Current status of audit on ip - {0} is {1}".format(self.master.ip, currentState))
if not currentState:
self.log.info ("Enabling Audit ")
Audit.setAuditEnable('true')
self.sleep(30)
开发者ID:EricACooper,项目名称:testrunner,代码行数:16,代码来源:x509tests.py
注:本文中的security.auditmain.audit函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论