• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python auditmain.audit函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中security.auditmain.audit函数的典型用法代码示例。如果您正苦于以下问题:Python audit函数的具体用法?Python audit怎么用?Python audit使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了audit函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_rotateIntervalCluster

    def test_rotateIntervalCluster(self):
        intervalSec = self.input.param("intervalSec", None)
        nodes_init = self.input.param("nodes_init", 2)
        auditIns = audit(host=self.master)
	auditIns.setAuditEnable('true')
        originalInt = auditIns.getAuditRotateInterval()
        auditIns.setAuditRotateInterval(intervalSec)
        firstEventTime = []

        try:
            for i in range(len(self.servers[:nodes_init])):
                auditTemp = audit(host=self.servers[i])
                firstEventTime.append(self.getTimeStampForFile(auditTemp))

            self.sleep(intervalSec + 20, 'Sleep for log roll over to happen')

            for i in range(len(self.servers[:nodes_init])):
                shell = RemoteMachineShellConnection(self.servers[i])
                rest = RestConnection(self.servers[i])
                status, content = rest.validateLogin(self.master.rest_username, self.master.rest_password, True, getContent=True)
                self.sleep(120, "sleeping for log file creation")
                try:
                    hostname = shell.execute_command("hostname")
                    self.log.info ("print firstEventTime {0}".format(firstEventTime[i]))
                    archiveFile = hostname[0][0] + '-' + firstEventTime[i] + "-audit.log"
                    self.log.info ("Archive File Name is {0}".format(archiveFile))
                    result = shell.file_exists(auditIns.pathLogFile, archiveFile)
                    self.assertTrue(result, "Archive Audit.log is not created on time interval")
                    self.log.info ("Validation of archive File created is True, Audit archive File is created {0}".format(archiveFile))
                    result = shell.file_exists(auditIns.pathLogFile, auditIns.AUDITLOGFILENAME)
                    self.assertTrue(result, "Audit.log is not created as per the roll over time specified")
                finally:
                    shell.disconnect()
        finally:
            auditIns.setAuditRotateInterval(originalInt)
开发者ID:arod1987,项目名称:testrunner,代码行数:35,代码来源:auditcheckconfig.py


示例2: test_AuditEvent

    def test_AuditEvent(self):
        auditIns = audit(host=self.master)
        ops = self.input.param("ops", None)
        source = 'internal'
        user = 'couchbase'
        rest = RestConnection(self.master)
        #status = rest.setAuditSettings(enabled='true')
        auditIns.setAuditEnable('true')
        if (ops in ['enable', 'disable']):
            if ops == 'disable':
                #status = rest.setAuditSettings(enabled='false')
                auditIns.setAuditEnable('false')
            else:
                #status = rest.setAuditSettings(enabled='true')
                auditIns.setAuditEnable('true')

        if ops == 'disable':
            shell = RemoteMachineShellConnection(self.master)
            try:
                result = shell.file_exists(auditIns.getAuditLogPath(), auditIns.AUDITLOGFILENAME)
            finally:
                shell.disconnect()
            self.assertFalse(result, 'Issue with file getting create in new directory')
        else:
            auditIns = audit(host=self.master)
            expectedResults = {"auditd_enabled":auditIns.getAuditStatus(),
                               "descriptors_path":self.changePathWindows(auditIns.getAuditConfigElement('descriptors_path')),
                               "log_path":self.changePathWindows((auditIns.getAuditLogPath())[:-1]), "source":"internal",
                               "user":"couchbase", "rotate_interval":86400, "version":1, 'hostname':self.getHostName(self.master)}
            self.checkConfig(self.AUDITCONFIGRELOAD, self.master, expectedResults)
开发者ID:arod1987,项目名称:testrunner,代码行数:30,代码来源:auditcheckconfig.py


示例3: test_eventDisabled

 def test_eventDisabled(self):
     disableEvent = self.input.param("disableEvent", None)
     Audit = audit(host=self.master)
     temp = Audit.getAuditConfigElement('all')
     temp['disabled'] = [disableEvent]
     Audit.writeFile(lines=temp)
     rest = RestConnection(self.master)
     rest.update_autofailover_settings(True, 120)
     auditIns = audit(eventID=disableEvent, host=self.master)
     status = auditIns.checkLastEvent()
     self.assertFalse(status, "Event still getting printed after getting disabled")
开发者ID:arod1987,项目名称:testrunner,代码行数:11,代码来源:auditcheckconfig.py


示例4: test_createBucketClusterNodeOut

    def test_createBucketClusterNodeOut(self):
        ops = self.input.param("ops", None)
        nodesOut = self.input.param("nodes_out", 1)
        source = 'ns_server'
	user = self.master.rest_username

        firstNode = self.servers[0]
        secondNode = self.servers[1]
        auditFirstNode = audit(host=firstNode)
        auditFirstNode.setAuditEnable('true')
	auditSecondNode = audit(host=secondNode)

        origState = auditFirstNode.getAuditStatus()
        origLogPath = auditFirstNode.getAuditLogPath()
        origRotateInterval = auditFirstNode.getAuditRotateInterval()

        #Remove the node from cluster & check if there are any change to cluster
        self.cluster.rebalance(self.servers, [], self.servers[1:nodesOut + 1])
        self.assertEqual(auditFirstNode.getAuditStatus(), origState, "Issues with audit state after removing node")
        self.assertEqual(auditFirstNode.getAuditLogPath(), origLogPath, "Issues with audit log path after removing node")
        self.assertEqual(auditFirstNode.getAuditRotateInterval(), origRotateInterval, "Issues with audit rotate interval after removing node")

        restFirstNode = RestConnection(firstNode)
        if (ops in ['create']):
            expectedResults = {'bucket_name':'TestBucketRemNode', 'ram_quota':104857600, 'num_replicas':0,
                                'replica_index':False, 'eviction_policy':'value_only', 'type':'membase', \
                                'auth_type':'sasl', "autocompaction":'false', "purge_interval":"undefined", \
                                "flush_enabled":False, "num_threads":3, "source":source, \
                                "user":user, "ip":self.ipAddress, "port":57457, 'sessionid':'' }
            restFirstNode.create_bucket(expectedResults['bucket_name'], expectedResults['ram_quota'] / 1048576, expectedResults['auth_type'], 'password', expectedResults['num_replicas'], \
                                '11211', 'membase', 0, expectedResults['num_threads'], 0, 'valueOnly')

            self.checkConfig(self.eventID, firstNode, expectedResults)

        #Add back the Node in Cluster
        self.cluster.rebalance(self.servers, self.servers[1:nodesOut + 1], [])
        self.assertEqual(auditSecondNode.getAuditStatus(), origState, "Issues with audit state after adding node")
        self.assertEqual(auditSecondNode.getAuditLogPath(), origLogPath, "Issues with audit log path after adding node")
        self.assertEqual(auditSecondNode.getAuditRotateInterval(), origRotateInterval, "Issues with audit rotate interval after adding node")

        for server in self.servers:
            user = server.rest_username
            rest = RestConnection(server)
            if (ops in ['create']):
                expectedResults = {'bucket_name':'TestBucket' + server.ip, 'ram_quota':104857600, 'num_replicas':1,
                                   'replica_index':False, 'eviction_policy':'value_only', 'type':'membase', \
                                   'auth_type':'sasl', "autocompaction":'false', "purge_interval":"undefined", \
                                    "flush_enabled":False, "num_threads":3, "source":source, \
                                   "user":user, "ip":self.ipAddress, "port":57457, 'sessionid':'' }
                rest.create_bucket(expectedResults['bucket_name'], expectedResults['ram_quota'] / 1048576, expectedResults['auth_type'], 'password', expectedResults['num_replicas'], \
                                   '11211', 'membase', 0, expectedResults['num_threads'], 0 , 'valueOnly')

                self.checkConfig(self.eventID, server, expectedResults)
开发者ID:pkdevboxy,项目名称:testrunner,代码行数:53,代码来源:audittest.py


示例5: test_audit_rm_node

 def test_audit_rm_node(self):
     eventID = 8197  # add node
     server = self.master
     if self.input.tuq_client and "client" in self.input.tuq_client:
         server = self.input.tuq_client["client"]
     index_field = self.input.param("index_field", "job_title")
     indexes = []
     try:
         audit_reb_out = audit(eventID=eventID, host=server)
         indexes = self._create_multiple_indexes(index_field)
         servers_in = self.servers[1 : self.nodes_in]
         self.cluster.rebalance(self.servers[:1], servers_in, [], services=self.services_in)
         rebalance = self.cluster.rebalance(self.servers[:1], [], servers_in)
         expected_result = {
             "services": self.services_in,
             "port": 8091,
             "hostname": servers_in[0].ip,
             "groupUUID": "0",
             "node": "[email protected]" + servers_in[0].ip,
             "source": "ns_server",
             "user": self.master.rest_username,
             "ip": self.getLocalIPAddress(),
             "port": 57457,
         }
         self.test_min()
         audit_reb_out.checkConfig(expected_result)
         rebalance.result()
     finally:
         for bucket in self.buckets:
             for index_name in set(indexes):
                 self.run_cbq_query(query="DROP INDEX %s.%s" % (bucket.name, index_name))
开发者ID:pkdevboxy,项目名称:testrunner,代码行数:31,代码来源:tuq_cluster_ops.py


示例6: test_rotateInterval

 def test_rotateInterval(self):
     intervalSec = self.input.param("intervalSec", None)
     auditIns = audit(host=self.master)
     rest = RestConnection(self.master)
     originalInt = auditIns.getAuditRotateInterval()
     try:
         firstEventTime = self.getTimeStampForFile(auditIns)
         self.log.info ("first time evetn is {0}".format(firstEventTime))
         auditIns.setAuditRotateInterval(intervalSec)
         self.sleep(intervalSec + 20, 'Sleep for log roll over to happen')
         status, content = rest.validateLogin(self.master.rest_username, self.master.rest_password, True, getContent=True)
         self.sleep(120)
         shell = RemoteMachineShellConnection(self.master)
         try:
             hostname = shell.execute_command("hostname")
             archiveFile = hostname[0][0] + '-' + firstEventTime + "-audit.log"
             self.log.info ("Archive File Name is {0}".format(archiveFile))
             result = shell.file_exists(auditIns.pathLogFile, archiveFile)
             self.assertTrue(result, "Archive Audit.log is not created on time interval")
             self.log.info ("Validation of archive File created is True, Audit archive File is created {0}".format(archiveFile))
             result = shell.file_exists(auditIns.pathLogFile, auditIns.AUDITLOGFILENAME)
             self.assertTrue(result, "Audit.log is not created when memcached server is killed")
         finally:
             shell.disconnect()
     finally:
         auditIns.setAuditRotateInterval(originalInt)
开发者ID:arod1987,项目名称:testrunner,代码行数:26,代码来源:auditcheckconfig.py


示例7: test_folderMisMatchCluster

    def test_folderMisMatchCluster(self):
        auditIns = audit(host=self.master)
        orginalPath = auditIns.getAuditLogPath()
        newPath = originalPath + 'testFolderMisMatch'
        shell = RemoteMachineShellConnection(self.servers[0])
        try:
            shell.create_directory(newPath)
            command = 'chown couchbase:couchbase ' + newPath
            shell.execute_command(command)
        finally:
            shell.disconnect()

        auditIns.setsetAuditLogPath(newPath)

        for server in self.servers:
            rest = RestConnection(sever)
            #Create an Event for Bucket Creation
            expectedResults = {'name':'TestBucket ' + server.ip, 'ram_quota':536870912, 'num_replicas':1,
                                       'replica_index':False, 'eviction_policy':'value_only', 'type':'membase', \
                                       'auth_type':'sasl', "autocompaction":'false', "purge_interval":"undefined", \
                                        "flush_enabled":False, "num_threads":3, "source":source, \
                                       "user":user, "ip":self.ipAddress, "port":57457, 'sessionid':'' }
            rest.create_bucket(expectedResults['name'], expectedResults['ram_quota'] / 1048576, expectedResults['auth_type'], 'password', expectedResults['num_replicas'], \
                                       '11211', 'membase', 0, expectedResults['num_threads'], expectedResults['flush_enabled'], 'valueOnly')

            #Check on Events
            try:
                self.checkConfig(self.eventID, self.servers[0], expectedResults)
            except:
                self.log.info ("Issue reading the file at Node {0}".format(server.ip))
开发者ID:arod1987,项目名称:testrunner,代码行数:30,代码来源:auditcheckconfig.py


示例8: test_fileRotate20MB

 def test_fileRotate20MB(self):
     auditIns = audit(host=self.master)
     firstEventTime = self.getTimeStampForFile(auditIns)
     tempEventCounter = 0
     rest = RestConnection(self.master)
     shell = RemoteMachineShellConnection(self.master)
     filePath = auditIns.pathLogFile + auditIns.AUDITLOGFILENAME
     number = int (shell.get_data_file_size(filePath))
     hostname = shell.execute_command("hostname")
     archiveFile = hostname[0][0] + '-' + firstEventTime + "-audit.log"
     result = shell.file_exists(auditIns.pathLogFile, archiveFile)
     tempTime = 0
     starttime = time.time()
     while ((number < 21089520) and (tempTime < 36000) and (result == False)):
         for i in range(1, 10):
             status, content = rest.validateLogin("Administrator", "password", True, getContent=True)
             tempEventCounter += 1
             number = int (shell.get_data_file_size(filePath))
             currTime = time.time()
             tempTime = int (currTime - starttime)
             result = shell.file_exists(auditIns.pathLogFile, archiveFile)
     self.sleep(30)
     result = shell.file_exists(auditIns.pathLogFile, archiveFile)
     shell.disconnect()
     self.log.info ("--------Total Event Created ---- {0}".format(tempEventCounter))
     self.assertTrue(result, "Archive Audit.log is not created on reaching 20MB threshhold")
开发者ID:arod1987,项目名称:testrunner,代码行数:26,代码来源:auditcheckconfig.py


示例9: test_role_assignment_audit

 def test_role_assignment_audit(self):
     ops = self.input.param("ops",'assign')
     if ops in ['assign','edit']:
         eventID=rbacmain.AUDIT_ROLE_ASSIGN
     elif ops == 'remove':
         eventID=rbacmain.AUDIT_REMOVE_ROLE
     Audit = audit(eventID=eventID, host=self.master)
     currentState = Audit.getAuditStatus()
     self.log.info ("Current status of audit on ip - {0} is {1}".format(self.master.ip, currentState))
     if currentState:
         Audit.setAuditEnable('false')
     self.log.info ("Enabling Audit ")
     Audit.setAuditEnable('true')
     self.sleep(30)
     user_name = self.input.param("user_name")
     final_roles = rbacmain()._return_roles(self.user_role)
     payload = "name=" + user_name + "&roles=" + final_roles
     status, content, header =  rbacmain(self.master)._set_user_roles(user_name=self.user_id,payload=payload)
     expectedResults = {"full_name":"RitamSharma","roles":["admin"],"identity:source":"saslauthd","identity:user":self.user_id,
                        "real_userid:source":"ns_server","real_userid:user":"Administrator",
                         "ip":self.ipAddress, "port":123456}
     if ops == 'edit':
         payload = "name=" + user_name + "&roles=" + 'admin,cluster_admin'
         status, content, header =  rbacmain(self.master)._set_user_roles(user_name=self.user_id,payload=payload)
         expectedResults = {"full_name":"RitamSharma","roles":["admin","cluster_admin"],"identity:source":"saslauthd","identity:user":self.user_id,
                        "real_userid:source":"ns_server","real_userid:user":"Administrator",
                         "ip":self.ipAddress, "port":123456}
     elif ops == 'remove':
         status, content, header = rbacmain(self.master)._delete_user(self.user_id)
         expectedResults = {"identity:source":"saslauthd","identity:user":self.user_id,
                        "real_userid:source":"ns_server","real_userid:user":"Administrator",
                         "ip":self.ipAddress, "port":123456}
     fieldVerification, valueVerification = Audit.validateEvents(expectedResults)
     self.assertTrue(fieldVerification, "One of the fields is not matching")
     self.assertTrue(valueVerification, "Values for one of the fields is not matching")
开发者ID:chethanrao,项目名称:testrunner-archive,代码行数:35,代码来源:rbacTest.py


示例10: test_invalidLogPathCluster

 def test_invalidLogPathCluster(self):
     auditIns = audit(host=self.master)
     newPath = auditIns.getAuditLogPath() + 'test'
     rest = RestConnection(self.master)
     status, content = rest.setAuditSettings(logPath=newPath)
     self.assertFalse(status, "Audit is able to set invalid path")
     self.assertEqual(content['errors']['logPath'], 'The value must be a valid directory', 'No error or error changed')
开发者ID:arod1987,项目名称:testrunner,代码行数:7,代码来源:auditcheckconfig.py


示例11: test_rotateIntervalShort

 def test_rotateIntervalShort(self):
     intervalSec = self.input.param("intervalSec", None)
     auditIns = audit(host=self.master)
     auditIns.setAuditRotateInterval(intervalSec)
     originalInt = auditIns.getAuditRotateInterval()
     status, content = auditIns.setAuditRotateInterval(intervalSec)
     self.assertFalse(status, "Audit log interval setting is <900 or > 604800")
     self.assertEqual(content['errors']['rotateInterval'], 'The value of rotateInterval must be in range from 15 minutes to 7 days')
开发者ID:arod1987,项目名称:testrunner,代码行数:8,代码来源:auditcheckconfig.py


示例12: test_changeLogPath

    def test_changeLogPath(self):
        nodes_init = self.input.param("nodes_init", 0)
        auditMaster = audit(host=self.servers[0])
        auditSecNode = audit(host=self.servers[1])
        #Capture original Audit Log Path
        originalPath = auditMaster.getAuditLogPath()

        #Create folders on CB server machines and change permission
        try:
            newPath = auditMaster.getAuditLogPath() + "folder"

            for server in self.servers[:nodes_init]:
                shell = RemoteMachineShellConnection(server)
                try:
                    shell.create_directory(newPath)
                    command = 'chown couchbase:couchbase ' + newPath
                    shell.execute_command(command)
                finally:
                    shell.disconnect()

            source = 'ns_server'
            user = self.master.rest_username
            auditMaster.setAuditLogPath(newPath)

            #Create an event of Updating autofailover settings
            for server in self.servers[:nodes_init]:
                rest = RestConnection(server)
                expectedResults = {'max_nodes':1, "timeout":120, 'source':source, "user":user, 'ip':self.ipAddress, 'port':12345}
                rest.update_autofailover_settings(True, expectedResults['timeout'])

                self.sleep(120, 'Waiting for new audit file to get created')
                #check for audit.log remotely
                shell = RemoteMachineShellConnection(server)
                try:
                    result = shell.file_exists(newPath, auditMaster.AUDITLOGFILENAME)
                finally:
                    shell.disconnect()

                if (result is False):
                    self.assertTrue(result, 'Issue with file getting create in new directory')

        finally:
            auditMaster.setAuditLogPath(originalPath)
开发者ID:arod1987,项目名称:testrunner,代码行数:43,代码来源:auditcheckconfig.py


示例13: getDefaultState

    def getDefaultState(self):
        auditIns = audit(host=self.master)

        #Validate that status of enabled is false
        tempStatus = auditIns.getAuditStatus()
        if (tempStatus == 'true'):
            tempStatus = True
        else:
            tempStatus = False
        self.assertFalse(tempStatus, "Audit is not disabled by default")
开发者ID:arod1987,项目名称:testrunner,代码行数:10,代码来源:auditcheckconfig.py


示例14: test_clusterEndToEnd

    def test_clusterEndToEnd(self):
        rest = []
        for i in range(0, len(self.servers)):
            node1 = self.servers[i]
            restNode1 = RestConnection(node1)
            rest.append(restNode1)

        auditNodeFirst = audit(host=self.servers[0])
        auditNodeSec = audit (host=self.servers[1])
        origLogPath = auditNodeFirst.getAuditLogPath()

        try:
            # Create Events on both the nodes
            for server in self.servers:
                rest = RestConnection(sever)
                #Create an Event for Bucket Creation
                expectedResults = self.createBucketAudit(server, "TestBucket" + server.ip)
                self.checkConfig(self.eventID, server, expectedResults)

            #Remove one node from the cluser
            self.cluster.rebalance(self.server, [], self.servers[1:self.nodes_out + 1])

            #Change path on first cluster + Create Bucket Event
            newPath = auditNodeFirst.getAuditLogPath() + "changeClusterLogPath"
            self.createRemoteFolder(self.server[0], newPath)
            auditNodeFirst.setAuditLogPath(newPath)
            expectedResults = self.createBucketAudit(self.server[0], "TestBucketFirstNode")
            self.checkConfig(self.eventID, self.servers[0], expectedResults)

            #Add one node to the cluster
            self.createRemoteFolder(self.server[1], newPath)

            self.cluster.rebalance(self.server, self.servers[1:self.nodes_out + 1], [])

            expectedResults = self.createBucketAudit(self.server[1], "TestBucketSecondNode")
            self.checkConfig(self.eventID, self.servers[1], expectedResults)

            #Change path on first cluster + Create Bucket Event
            auditNodeFirst.setAuditLogPath(origLogPath)
            expectedResults = self.createBucketAudit(self.server[0], "TestBucketFirstNode")
            self.checkConfig(self.eventID, self.servers[0], expectedResults)
        except:
            auditNodeFirst.setAuditLogPath(origLogPath)
开发者ID:arod1987,项目名称:testrunner,代码行数:43,代码来源:auditcheckconfig.py


示例15: setUp

    def setUp(self):
        super(auditTest, self).setUp()
        self.ipAddress = self.getLocalIPAddress()
        self.eventID = self.input.param('id', None)
	auditTemp = audit(host=self.master)
	currentState = auditTemp.getAuditStatus()
	self.log.info ("Current status of audit on ip - {0} is {1}".format(self.master.ip, currentState))
	if not currentState:
	    self.log.info ("Enabling Audit ")
            auditTemp.setAuditEnable('true')
            self.sleep(30)
开发者ID:pkdevboxy,项目名称:testrunner,代码行数:11,代码来源:audittest.py


示例16: checkConfig

 def checkConfig(self, eventID, host, expectedResults):
     Audit = audit(eventID=eventID, host=host)
     currentState = Audit.getAuditStatus()
     self.log.info ("Current status of audit on ip - {0} is {1}".format(self.master.ip, currentState))
     if not currentState:
         self.log.info ("Enabling Audit ")
         Audit.setAuditEnable('true')
         self.sleep(30)
     fieldVerification, valueVerification = Audit.validateEvents(expectedResults)
     self.assertTrue(fieldVerification, "One of the fields is not matching")
     self.assertTrue(valueVerification, "Values for one of the fields is not matching")
开发者ID:EricACooper,项目名称:testrunner,代码行数:11,代码来源:x509tests.py


示例17: test_AuditEvent

    def test_AuditEvent(self):
        auditIns = audit(host=self.master)
        ops = self.input.param("ops", None)
        source = 'internal'
        user = 'couchbase'
        rest = RestConnection(self.master)
        #status = rest.setAuditSettings(enabled='true')
        auditIns.setAuditEnable('true')
        if (ops in ['enable', 'disable']):
            if ops == 'disable':
                #status = rest.setAuditSettings(enabled='false')
                auditIns.setAuditEnable('false')
            else:
                #status = rest.setAuditSettings(enabled='true')
                auditIns.setAuditEnable('true')

        auditIns = audit(host=self.master)
        expectedResults = {"auditd_enabled":auditIns.getAuditStatus(),
                           "descriptors_path":auditIns.getAuditConfigElement('descriptors_path'),
                           "log_path":(auditIns.getAuditLogPath())[:-1], "source":"internal",
                           "user":"couchbase", "rotate_interval":86400, "version":1, 'hostname':self.getHostName(self.master)}
        self.checkConfig(self.AUDITCONFIGRELOAD, self.master, expectedResults)
开发者ID:lichia,项目名称:testrunner,代码行数:22,代码来源:auditcheckconfig.py


示例18: test_enableStatusCluster

    def test_enableStatusCluster(self):
        nodes_init = self.input.param("nodes_init", 2)
        auditIns = audit(host=self.master)
        origState = auditIns.getAuditStatus()
        auditIns.setAuditEnable('true')

        try:
            for i in range(len(self.servers[:nodes_init])):
                auditTemp = audit(host=self.servers[i])
                tempStatus = auditTemp.getAuditStatus()
                self.log.info ("value of current status is {0} on ip -{1}".format(tempStatus, self.servers[i].ip))
                self.assertTrue(tempStatus, "Audit is not enabled across the cluster")

            auditTemp = audit(host=self.servers[1])
            auditTemp.setAuditEnable('false')

            for i in range(len(self.servers[:nodes_init])):
                auditTemp = audit(host=self.servers[i])
                tempStatus = auditTemp.getAuditStatus()
                self.log.info ("value of current status is {0} on ip -{1}".format(tempStatus, self.servers[i].ip))
                self.assertFalse(tempStatus, "Audit is not enabled across the cluster")

        finally:
            auditIns.setAuditEnable(self.returnBoolVal(origState))
开发者ID:arod1987,项目名称:testrunner,代码行数:24,代码来源:auditcheckconfig.py


示例19: run_test_queryEvents

    def run_test_queryEvents(self):
        # required for local testing: uncomment below
        self.ipAddress = self.master.ip
        #self.ipAddress = self.getLocalIPAddress()
        # self.eventID = self.input.param('id', None)
        auditTemp = audit(host=self.master)
        currentState = auditTemp.getAuditStatus()
        self.log.info("Current status of audit on ip - {0} is {1}".format(self.master.ip, currentState))
        if not currentState:
            self.log.info("Enabling Audit ")
            auditTemp.setAuditEnable('true')
            self.sleep(30)
        rest = RestConnection(self.master)
        self.setupLDAPSettings(rest)
        query_type = self.op_type
        user = self.master.rest_username
        source = 'ns_server'
        if query_type == 'select':
            if self.filter:
                self.execute_filtered_query()
            self.run_cbq_query(server=self.master, query="SELECT * FROM default LIMIT 100")
            expectedResults = {'node':'%s:%s' % (self.master.ip, self.master.port), 'status': 'success', 'isAdHoc': True,
                               'name': 'SELECT statement', 'real_userid': {'source': source, 'user': user},
                               'statement': 'SELECT * FROM default LIMIT 100',
                               'userAgent': 'Python-httplib2/$Rev: 259 $', 'id': self.eventID,
                               'description': 'A N1QL SELECT statement was executed'}
        elif query_type == 'insert':
            if self.filter:
                self.execute_filtered_query()
            self.run_cbq_query(server=self.master, query='INSERT INTO default ( KEY, VALUE ) VALUES ("1",{ "order_id": "1", "type": '
                                     '"order", "customer_id":"24601", "total_price": 30.3, "lineitems": '
                                     '[ "11", "12", "13" ] })')
            expectedResults = {'node': '%s:%s' % (self.master.ip, self.master.port), 'status': 'success', 'isAdHoc': True,
                               'name': 'INSERT statement', 'real_userid': {'source': source, 'user': user},
                               'statement': 'INSERT INTO default ( KEY, VALUE ) VALUES ("1",{ "order_id": "1", "type": '
                                            '"order", "customer_id":"24601", "total_price": 30.3, "lineitems": '
                                            '[ "11", "12", "13" ] })',
                               'userAgent': 'Python-httplib2/$Rev: 259 $', 'id': self.eventID,
                               'description': 'A N1QL INSERT statement was executed'}

        if query_type == 'delete':
            self.checkConfig(self.eventID, self.servers[1], expectedResults, n1ql_audit=True)
            if self.filter:
                self.checkFilter(self.unauditedID, self.servers[1])
        else:
            self.checkConfig(self.eventID, self.master, expectedResults, n1ql_audit=True)
            if self.filter:
                self.checkFilter(self.unauditedID, self.master)
开发者ID:ritamcouchbase,项目名称:testrunner,代码行数:48,代码来源:n1ql_upgrade.py


示例20: setUp

 def setUp(self):
     super(x509_upgrade, self).setUp()
     self.initial_version = self.input.param("initial_version",'4.5.0-900')
     self.upgrade_version = self.input.param("upgrade_version", "4.5.0-1069")
     self._reset_original()
     x509main(self.master)._generate_cert(self.servers)
     self.ip_address = self.getLocalIPAddress()
     enable_audit=self.input.param('audit',None)
     if enable_audit:
         Audit = audit(host=self.master)
         currentState = Audit.getAuditStatus()
         self.log.info ("Current status of audit on ip - {0} is {1}".format(self.master.ip, currentState))
         if not currentState:
             self.log.info ("Enabling Audit ")
             Audit.setAuditEnable('true')
             self.sleep(30)
开发者ID:EricACooper,项目名称:testrunner,代码行数:16,代码来源:x509tests.py



注:本文中的security.auditmain.audit函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python rbacmain.rbacmain函数代码示例发布时间:2022-05-27
下一篇:
Python security.User类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap