• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python test.CallTrace类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中seecr.test.CallTrace的典型用法代码示例。如果您正苦于以下问题:Python CallTrace类的具体用法?Python CallTrace怎么用?Python CallTrace使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了CallTrace类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: DrilldownQueriesTest

class DrilldownQueriesTest(SeecrTestCase):
    def setUp(self):
        SeecrTestCase.setUp(self)
        self.dbdq = DrilldownQueries()
        self.observer = CallTrace(methods=dict(executeQuery=mockExecuteQuery))
        self.dbdq.addObserver(self.observer)


    def testDrilldownQuery(self):
        result = retval(self.dbdq.executeQuery(extraArguments={'x-drilldown-query': ['a = b']}))
        self.assertEquals('result', result)
        self.assertEquals(['executeQuery'], self.observer.calledMethodNames())
        executeQueryMethod = self.observer.calledMethods[0]
        self.assertEquals([('a', ['b'])], executeQueryMethod.kwargs['drilldownQueries'])

        self.observer.calledMethods.reset()

        result = retval(self.dbdq.executeQuery(extraArguments={'x-drilldown-query': ['a exact b']}))
        self.assertEquals('result', result)
        self.assertEquals(['executeQuery'], self.observer.calledMethodNames())
        executeQueryMethod = self.observer.calledMethods[0]
        self.assertEquals([('a', ['b'])], executeQueryMethod.kwargs['drilldownQueries'])

    def testErrorForInvalidFormatDrilldownQuery(self):
        try:
            retval(self.dbdq.executeQuery(extraArguments={'x-drilldown-query': ['a']}))
            self.fail()
        except ValueError, e:
            self.assertEquals('x-drilldown-query format should be field=value', str(e))
        self.assertEquals([], self.observer.calledMethodNames())
开发者ID:seecr,项目名称:meresco-components,代码行数:30,代码来源:drilldownqueriestest.py


示例2: testServiceExecuteQuery

 def testServiceExecuteQuery(self):
     observer = CallTrace('lucene')
     def executeQuery(**kwargs):
         raise StopIteration(LuceneResponse(total=2, hits=['aap','noot']))
         yield
     observer.methods['executeQuery'] = executeQuery
     service = LuceneRemoteService(CallTrace('reactor'))
     service.addObserver(observer)
     body = dumps({
             'message': 'executeQuery',
             'kwargs':{
                 'cqlAbstractSyntaxTree': {'__CQL_QUERY__': 'query AND field=value'},
                 'start':0,
                 'stop': 10,
                 'facets': [{'fieldname': 'field', 'maxTerms':5}],
                 'filterQueries': [{'__CQL_QUERY__': 'query=fiets'}],
                 'joinQueries': {'core1': {'__CQL_QUERY__': 'query=test'}}
             }
         })
     result = ''.join(compose(service.handleRequest(path='/__lucene_remote__', Method="POST", Body=body)))
     header, body = result.split('\r\n'*2)
     self.assertTrue('Content-Type: application/json' in header, header+body)
     response = LuceneResponse.fromJson(body)
     self.assertEquals(2, response.total)
     self.assertEquals(['aap', 'noot'], response.hits)
     self.assertEquals(['executeQuery'], observer.calledMethodNames())
     m = observer.calledMethods[0]
     self.assertEquals(parseString('query AND field=value'), m.kwargs['cqlAbstractSyntaxTree'])
     self.assertEquals(0, m.kwargs['start'])
     self.assertEquals(10, m.kwargs['stop'])
     self.assertEquals([{'fieldname': 'field', 'maxTerms':5}], m.kwargs['facets'])
     self.assertEquals([parseString('query=fiets')], m.kwargs['filterQueries'])
     self.assertEquals({'core1': parseString('query=test')}, m.kwargs['joinQueries'])
开发者ID:FashtimeDotCom,项目名称:meresco-lucene,代码行数:33,代码来源:luceneremotetest.py


示例3: testUnusedTimeoutSetInitialisesTimer

    def testUnusedTimeoutSetInitialisesTimer(self):
        # Whitebox (unusedTimeout -> addTimer)
        mockReactor = CallTrace()
        SocketPool(reactor=mockReactor, unusedTimeout=0.02)
        self.assertEquals(['addTimer'], mockReactor.calledMethodNames())
        self.assertEquals(['seconds', 'callback'], mockReactor.calledMethods[0].kwargs.keys())
        self.assertEquals(0.02, mockReactor.calledMethods[0].kwargs['seconds'])

        # Blackbox
        def test():
            top = be((Observable(),
                (SocketPool(reactor=reactor(), unusedTimeout=0.02),),
            ))
            yield top.any.putSocketInPool(host='x', port=80, sock=MockSok('A'))
            yield top.any.putSocketInPool(host='x', port=80, sock=MockSok('B'))
            yield sleep(seconds=0.001)

            result = yield top.any.getPooledSocket(host='x', port=80)
            self.assertEquals('B', result)

            yield sleep(seconds=0.04)

            result = yield top.any.getPooledSocket(host='x', port=80)
            self.assertEquals(None, result)

        asProcess(test())
开发者ID:seecr,项目名称:weightless-core,代码行数:26,代码来源:socketpooltest.py


示例4: createUpload

def createUpload(about=None):
    repository = CallTrace('repository')
    repository.id = 'repoId'

    upload = Upload(repository=repository, oaiResponse=oaiResponse(about=about))
    upload.id = 'id'
    return upload
开发者ID:seecr,项目名称:meresco-harvester,代码行数:7,代码来源:filesystemuploadtest.py


示例5: testCollectLogWhenIndexRaisesError

    def testCollectLogWhenIndexRaisesError(self):
        handler = SruHandler(enableCollectLog=True)
        observer = CallTrace('observer', emptyGeneratorMethods=['echoedExtraRequestData', 'extraResponseData', 'additionalDiagnosticDetails'])
        __callstack_var_logCollector__ = dict()
        times = [1]
        def timeNow():
            return times.pop(0)
        handler._timeNow = timeNow

        def executeQuery(**kwargs):
            raise Exception('Sorry')
            yield
        observer.methods['executeQuery'] = executeQuery
        handler.addObserver(observer)
        arguments = dict(startRecord=11, maximumRecords=15, query='query', recordPacking='string', recordSchema='schema')
        consume(handler.searchRetrieve(sruArguments=arguments, **arguments))

        self.assertEquals({
            'sru': {
                'arguments': [{
                    'startRecord': 11,
                    'query': 'query',
                    'recordPacking': 'string',
                    'maximumRecords': 15,
                    'recordSchema': 'schema',
                }],
            }
        }, __callstack_var_logCollector__)
开发者ID:seecr,项目名称:meresco-components,代码行数:28,代码来源:sruhandlertest.py


示例6: testShouldUnsetFlagImmediate

    def testShouldUnsetFlagImmediate(self):
        i = [0]
        def addTimer(*args, **kwargs):
            i[0] = i[0] + 1
            return i[0]

        reactor = CallTrace('reactor', methods=dict(addTimer=addTimer))
        registry = ServiceRegistry(reactor, self.tempdir, domainname="zp.example.org")
        observer = CallTrace('observer')
        registry.addObserver(observer)

        identifier = str(uuid4())
        registry.updateService(identifier=identifier, type='plein', ipAddress='127.0.0.1', infoport=1234, data={})
        registry = ServiceRegistry(reactor, self.tempdir, domainname="zp.example.org")

        flag = READABLE
        service = registry.getService(identifier)

        registry.setFlag(identifier, flag, True)
        self.assertEqual([], reactor.calledMethodNames())
        service = registry.getService(identifier)
        self.assertFalse(service[flag.name], service)
        state = registry.getPrivateStateFor(identifier)
        self.assertTrue(state[flag.name], state)
        self.assertTrue(state[flag.name + "_goingup"])

        registry.setFlag(identifier, flag, False, immediate=True)
        self.assertEqual([], reactor.calledMethodNames())
        service = registry.getService(identifier)
        self.assertFalse(service[flag.name], service)
        state = registry.getPrivateStateFor(identifier)
        self.assertFalse(state[flag.name], state)
        self.assertFalse(flag.name + "_goingup" in state)
开发者ID:seecr,项目名称:meresco-distributed,代码行数:33,代码来源:serviceregistrytest.py


示例7: testAllQueryHelpersForSRU

    def testAllQueryHelpersForSRU(self):
        index = CallTrace('index')
        def executeQuery(**kwargs):
            raise StopIteration(Response(total=3201, hits=[]))
            yield
        index.methods['executeQuery'] = executeQuery
        index.ignoredAttributes.extend(['echoedExtraRequestData', 'extraResponseData', 'all_unknown'])
        server = be((Observable(),
            (self.queryLog,
                (SruParser(),
                    (QueryLogHelperForSru(),
                        (SruHandler(extraRecordDataNewStyle=True),
                            (QueryLogHelperForExecuteCQL(),
                                (index,)
                            )
                        )
                    )
                )
            ),
        ))

        ''.join(compose(server.all.handleRequest(
                path='/path/sru',
                Client=('11.22.33.44', 8080),
                arguments={
                    'operation': ['searchRetrieve'],
                    'version': ['1.2'],
                    'maximumRecords': ['0'],
                    'query': ['field=value'],
                    },
            )))
        self.assertEquals('2009-11-02T11:25:37Z 11.22.33.44 0.7K 1.000s 3201hits /path/sru maximumRecords=0&operation=searchRetrieve&query=field%3Dvalue&recordPacking=xml&recordSchema=dc&startRecord=1&version=1.2\n', open(join(self.tempdir, '2009-11-02-query.log')).read())
开发者ID:seecr,项目名称:meresco-components,代码行数:32,代码来源:querylogtest.py


示例8: testQueryTimeInExtraResponse

    def testQueryTimeInExtraResponse(self):
        handler = SruHandler(includeQueryTimes=True)
        observer = CallTrace('observer', emptyGeneratorMethods=['echoedExtraRequestData', 'extraResponseData'])

        times = [1, 2.5, 3.5]
        def timeNow():
            return times.pop(0)
        handler._timeNow = timeNow

        def executeQuery(**kwargs):
            response = Response(total=0, hits=[])
            response.queryTime=5
            raise StopIteration(response)
            yield
        observer.methods['executeQuery'] = executeQuery
        handler.addObserver(observer)
        arguments = dict(startRecord=11, maximumRecords=15, query='query', recordPacking='string', recordSchema='schema')
        result = "".join(compose(handler.searchRetrieve(sruArguments=arguments, **arguments)))
        sruResponse = parse(StringIO(result))
        extraResponseData = sruResponse.xpath('/srw:searchRetrieveResponse/srw:extraResponseData', namespaces={'srw':"http://www.loc.gov/zing/srw/"})[0]
        self.assertEqualsWS("""<srw:extraResponseData %(xmlns_srw)s %(xmlns_diag)s %(xmlns_xcql)s %(xmlns_dc)s %(xmlns_meresco_srw)s>
        <querytimes xmlns="http://meresco.org/namespace/timing">
            <sruHandling>PT2.500S</sruHandling>
            <sruQueryTime>PT1.500S</sruQueryTime>
            <index>PT0.005S</index>
        </querytimes>
</srw:extraResponseData>""" % namespaces, lxmltostring(extraResponseData))
        queryTimes = lxmltostring(extraResponseData.xpath('//ti:querytimes', namespaces={'ti':"http://meresco.org/namespace/timing"})[0])
        assertValid(queryTimes, join(schemasPath, 'timing-20120827.xsd'))
        self.assertEquals(['executeQuery', 'echoedExtraRequestData', 'extraResponseData', 'handleQueryTimes'], observer.calledMethodNames())
        self.assertEquals({'sru': Decimal("2.500"), 'queryTime': Decimal("1.500"), 'index': Decimal("0.005")}, observer.calledMethods[3].kwargs)
开发者ID:seecr,项目名称:meresco-components,代码行数:31,代码来源:sruhandlertest.py


示例9: testSimpleCall

 def testSimpleCall(self):
     callTrace = CallTrace()
     callTrace.simpleCall()
     self.assertEquals(1, len(callTrace.calledMethods))
     tracedCall = callTrace.calledMethods[0]
     self.assertEquals('simpleCall', tracedCall.name)
     self.assertEquals(0, len(tracedCall.args))
开发者ID:seecr,项目名称:seecr-test,代码行数:7,代码来源:calltracetest.py


示例10: setUp

 def setUp(self):
     SeecrTestCase.setUp(self)
     self.stateDir = join(self.tempdir, "state")
     self.logDir = join(self.tempdir, "log")
     self.domainId = "adomain"
     makedirs(join(self.stateDir, self.domainId))
     repoId1LogDir = join(self.logDir, self.domainId, "invalid", "repoId1")
     repoId2LogDir = join(self.logDir, self.domainId, "invalid", escapeFilename("repoId/2"))
     makedirs(repoId1LogDir)
     makedirs(repoId2LogDir)
     open(join(repoId1LogDir, "invalidId1"), 'w').write("<diagnostic>ERROR1</diagnostic>")
     open(join(repoId1LogDir, "invalidId&2"), 'w').write("<diagnostic>ERROR2</diagnostic>")
     open(join(repoId2LogDir, escapeFilename("invalidId/3")), 'w').write("<diagnostic>ERROR3</diagnostic>")
     open(join(self.stateDir, self.domainId, "repoId1_invalid.ids"), 'w').write("invalidId1\ninvalidId&2")
     open(join(self.stateDir, self.domainId, escapeFilename("repoId/2_invalid.ids")), 'w').write("invalidId/3")
     open(join(self.stateDir, self.domainId, "repoId3_invalid.ids"), 'w').write("")
     self.status = RepositoryStatus(self.logDir, self.stateDir)
     observer = CallTrace("HarvesterData")
     observer.returnValues["getRepositoryGroupIds"] = ["repoGroupId1", "repoGroupId2"]
     def getRepositoryIds(domainId, repositoryGroupId):
         if repositoryGroupId == "repoGroupId1":
             return ["repoId1", "repoId/2"]
         return ["repoId3", "anotherRepoId"]
     observer.methods["getRepositoryIds"] = getRepositoryIds
     def getRepositoryGroupId(domainId, repositoryId):
         return 'repoGroupId1' if repositoryId in ['repoId1', 'repoId/2'] else 'repoGroupId2'
     observer.methods["getRepositoryGroupId"] = getRepositoryGroupId
     self.status.addObserver(observer)
开发者ID:seecr,项目名称:meresco-harvester,代码行数:28,代码来源:repositorystatustest.py


示例11: testFieldWithMultiLevel

    def testFieldWithMultiLevel(self):
        multi = MultiLevelDrilldown(
            {'date':[('yearAndMonth', 2, False), ('year', 2, False)]
            }
        )
        drilldown = CallTrace('Drilldown')
        doDrilldownArguments = []
        def doDrilldown(bitMatrixRow, fieldNamesAndMaxResults):
            doDrilldownArguments.append((bitMatrixRow, fieldNamesAndMaxResults))
            self.assertEquals(1, len(fieldNamesAndMaxResults))
            levelField, levelMax, levelSorted = fieldNamesAndMaxResults[0]
            if levelField == 'yearAndMonth':
                raise StopIteration(iter([('yearAndMonth', iter([('2008-01',11),('2008-02',2),('2007-12',1)][:levelMax]))]))
            else:
                raise StopIteration(iter([('year', iter([('2008',13),('2003',10),('2007',10)][:levelMax]))]))
            yield
        drilldown.drilldown = doDrilldown
        multi.addObserver(drilldown)

        result = list(compose(multi.multiLevelDrilldown('bitMatrixRow', ['date'])))

        self.assertEquals(2, len(doDrilldownArguments))
        self.assertEquals(('bitMatrixRow', [('yearAndMonth', 2, False)]), doDrilldownArguments[0])
        self.assertEquals(('bitMatrixRow', [('year', 2, False)]), doDrilldownArguments[1])
        self.assertEquals(1, len(result))
        (inField, realField), termCounts = result[0]
        self.assertEquals('year', realField)
        self.assertEquals([('2008',13),('2003',10)], list(termCounts))
开发者ID:seecr,项目名称:meresco-components,代码行数:28,代码来源:multileveldrilldowntest.py


示例12: testTwoFieldNamesCalled

    def testTwoFieldNamesCalled(self):
        multi = MultiLevelDrilldown(
            {'date':[('datelevel2',3, False),('datelevel1', 10, False)],
             'genre':[('type', 10, False)]
            }
        )
        drilldown = CallTrace('Drilldown')
        doDrilldownArguments = []
        def doDrilldown(bitMatrixRow, fieldNamesAndMaxResults):
            doDrilldownArguments.append((bitMatrixRow, fieldNamesAndMaxResults))
            self.assertEquals(1, len(fieldNamesAndMaxResults))
            levelField, levelMax, levelSorted = fieldNamesAndMaxResults[0]
            if 'datelevel2' == levelField:
                raise StopIteration(iter([('datelevel2', iter([('2008',13),('2007',10)][:levelMax]))]))
            else:
                raise StopIteration(iter([('type', iter([('literature',43),('donaldduck',30)][:levelMax]))]))
            yield
        drilldown.drilldown = doDrilldown
        multi.addObserver(drilldown)

        result = list(compose(multi.multiLevelDrilldown('bitMatrixRow', ['date', 'genre'])))

        self.assertEquals(2, len(doDrilldownArguments))
        self.assertEquals(('bitMatrixRow', [('datelevel2', 3, False)]), doDrilldownArguments[0])
        self.assertEquals(('bitMatrixRow', [('type', 10, False)]), doDrilldownArguments[1])
        self.assertEquals(2, len(result))
        self.assertEquals([('date', 'datelevel2'),('genre', 'type')], [(inField, realField) for (inField, realField), termCounts in result])
开发者ID:seecr,项目名称:meresco-components,代码行数:27,代码来源:multileveldrilldowntest.py


示例13: testOne

    def testOne(self):
        observable = Observable()
        bitMatrixRow = CallTrace('BitMatrixRow')
        multi = MultiLevelDrilldown(
            {'date':[('datelevel1', 10, False)]}
        )
        drilldown = CallTrace('Drilldown')
        def dd(*args, **kwargs):
            raise StopIteration(iter([('datelevel1', iter([('2008',13),('2007',10)]))]))
            yield
        drilldown.methods['drilldown'] = dd
        multi.addObserver(drilldown)
        observable.addObserver(multi)

        result = list(compose(observable.call.multiLevelDrilldown(bitMatrixRow, ['date'])))

        self.assertEquals(1, len(drilldown.calledMethods))
        drilldownMethod = drilldown.calledMethods[0]
        self.assertEquals('drilldown', drilldownMethod.name)
        self.assertEquals((bitMatrixRow, [('datelevel1', 10, False)]), drilldownMethod.args)
        self.assertEquals(1, len(result))
        (inputFieldName, realFieldName), termCounts = result[0]
        self.assertEquals('date', inputFieldName)
        self.assertEquals('datelevel1', realFieldName)
        self.assertEquals([('2008',13),('2007',10)], list(termCounts))
开发者ID:seecr,项目名称:meresco-components,代码行数:25,代码来源:multileveldrilldowntest.py


示例14: testWithSorting

    def testWithSorting(self):
        mockData = {
            'yearAndMonth': [('2008-01',1),('2008-02',2),('2007-12',11)],
            'year': [('2008',13),('2003',10),('2005',9), ('2007', 15)]
        }
        drilldown = CallTrace('Drilldown')
        def doDrilldown(bitMatrixRow, fieldNamesAndMaxResults):
            levelField, levelMax, levelSorted = fieldNamesAndMaxResults[0]
            data = mockData[levelField]
            if levelSorted:
                data = sorted(data, cmp=lambda (term0, card0), (term1, card1): cmp(card1, card0))
            if levelMax > 0:
                data = data[:levelMax]
            raise StopIteration(iter([(levelField, iter(data))]))
            yield
        drilldown.drilldown = doDrilldown

        multi = MultiLevelDrilldown({'date':[('yearAndMonth', 2, False), ('year', 3, True)]})
        multi.addObserver(drilldown)
        result = list(compose(multi.multiLevelDrilldown('bitMatrixRow', ['date'])))
        self.assertEquals([(('date', 'year'), [('2007', 15), ('2008', 13), ('2003', 10)])], result)

        multi = MultiLevelDrilldown({'date':[('yearAndMonth', 4, False), ('year', 3, False)]})
        multi.addObserver(drilldown)

        result = list(compose(multi.multiLevelDrilldown('bitMatrixRow', ['date'])))
        self.assertEquals([(('date', 'yearAndMonth'), [('2008-01',1),('2008-02',2),('2007-12',11)])], result)
开发者ID:seecr,项目名称:meresco-components,代码行数:27,代码来源:multileveldrilldowntest.py


示例15: testLogCanReturnCallables

    def testLogCanReturnCallables(self):
        observer= CallTrace('observer')
        observer.returnValues['handleRequest'] = (f for f in ['1', lambda: None,'3'])
        self.queryLog.addObserver(observer)
        list(compose(self.queryLog.handleRequest(Client=('127.0.0.1', 47785), path='/path/sru', otherArg='value')))

        self.assertEquals(1, len(open(join(self.tempdir, '2009-11-02-query.log')).readlines()))
开发者ID:seecr,项目名称:meresco-components,代码行数:7,代码来源:querylogtest.py


示例16: testRemoteExecuteQuery

    def testRemoteExecuteQuery(self):
        http = CallTrace('http')
        def httppost(*args, **kwargs):
            raise StopIteration('HTTP/1.0 200 Ok\r\n\r\n%s' % LuceneResponse(total=5, hits=[Hit("1"), Hit("2"), Hit("3", duplicateCount=2), Hit("4"), Hit("5")]).asJson())
            yield
        http.methods['httppost'] = httppost
        remote = LuceneRemote(host='host', port=1234, path='/path')
        observable = Observable()
        observable.addObserver(remote)
        remote._httppost = http.httppost

        cq = ComposedQuery('coreA')
        cq.setCoreQuery(
                core='coreA',
                query=parseString('query AND  field=value'),
                filterQueries=[parseString('query=fiets')],
                facets=[{'fieldname': 'field', 'maxTerms':5}],
            )
        cq.setCoreQuery(core='coreB', query=parseString('query=test'))
        cq.addMatch(dict(core='coreA', uniqueKey='keyA'), dict(core='coreB', key='keyB'))
        result = returnValueFromGenerator(observable.any.executeComposedQuery(query=cq))
        self.assertEquals(5, result.total)
        self.assertEquals([Hit("1"), Hit("2"), Hit("3", duplicateCount=2), Hit("4"), Hit("5")], result.hits)

        self.assertEquals(['httppost'], http.calledMethodNames())
        m = http.calledMethods[0]
        self.assertEquals('host', m.kwargs['host'])
        self.assertEquals(1234, m.kwargs['port'])
        self.assertEquals('/path/__lucene_remote__', m.kwargs['request'])
        self.assertEquals('application/json', m.kwargs['headers']['Content-Type'])
        message, kwargs = Conversion().jsonLoadMessage(m.kwargs['body'])
        query = kwargs['query']
        self.assertEquals('executeComposedQuery', message)
        self.assertEquals('coreA', query.resultsFrom)
        self.assertEquals([{'fieldname': 'field', 'maxTerms':5}], query.facetsFor('coreA'))
开发者ID:FashtimeDotCom,项目名称:meresco-lucene,代码行数:35,代码来源:luceneremotetest.py


示例17: testApacheLog

    def testApacheLog(self):
        requestHandler = CallTrace("handler", ignoredAttributes=["writeLog", "do_unknown"])
        requestHandler.returnValues["handleRequest"] = (f for f in [Yield, okPlainText, "te", callable, "xt"])
        stream = StringIO()
        handleRequestLog = HandleRequestLog()
        handleRequestLog._time = lambda: 1395409143.0

        observable = be(
            (Observable(), (LogCollector(), (handleRequestLog, (requestHandler,)), (ApacheLogWriter(stream),)))
        )

        result = asList(
            observable.all.handleRequest(
                Method="GET",
                Client=("127.0.0.1", 1234),
                RequestURI="http://example.org/path?key=value",
                query="key=value",
                path="/path",
                Headers={"Referer": "http://meresco.org", "User-Agent": "Meresco-Components Test"},
                otherKwarg="value",
            )
        )

        self.assertEquals([Yield, okPlainText, "te", callable, "xt"], result)
        self.assertEquals(["handleRequest"], requestHandler.calledMethodNames())
        logline = stream.getvalue()
        self.assertEquals(
            '127.0.0.1 - - [21/Mar/2014:13:39:03 +0000] "GET /path?key=value HTTP/1.0" 200 64 "http://meresco.org" "Meresco-Components Test"\n',
            logline,
        )
开发者ID:seecr,项目名称:meresco-components,代码行数:30,代码来源:handlerequestlogtest.py


示例18: testCollectLog

    def testCollectLog(self):
        handler = SruHandler(enableCollectLog=True)
        observer = CallTrace('observer', emptyGeneratorMethods=['echoedExtraRequestData', 'extraResponseData'])
        __callstack_var_logCollector__ = dict()
        times = [1, 2.5, 3.5]
        def timeNow():
            return times.pop(0)
        handler._timeNow = timeNow

        def executeQuery(**kwargs):
            response = Response(total=0, hits=[])
            response.queryTime=5
            raise StopIteration(response)
            yield
        observer.methods['executeQuery'] = executeQuery
        handler.addObserver(observer)
        arguments = dict(startRecord=11, maximumRecords=15, query='query', recordPacking='string', recordSchema='schema')
        consume(handler.searchRetrieve(sruArguments=arguments, **arguments))

        self.assertEquals({
            'sru': {
                'handlingTime': [Decimal('2.500')],
                'queryTime': [Decimal('1.500')],
                'indexTime': [Decimal('0.005')],
                'numberOfRecords': [0],
                'arguments': [{
                    'startRecord': 11,
                    'query': 'query',
                    'recordPacking': 'string',
                    'maximumRecords': 15,
                    'recordSchema': 'schema',
                }],
            }
        }, __callstack_var_logCollector__)
开发者ID:seecr,项目名称:meresco-components,代码行数:34,代码来源:sruhandlertest.py


示例19: testResumeOrThrowOnlyOnce

    def testResumeOrThrowOnlyOnce(self):
        # A.k.a. Promise / Future like behaviour.
        trace = CallTrace("Reactor")
        def getSuspend():
            trace.calledMethods.reset()
            suspend = Suspend(doNext=trace.doNext)
            self.assertEquals([], trace.calledMethodNames())

            suspend(reactor=trace, whenDone=trace.whenDone)
            self.assertEquals(['doNext', 'suspend'], trace.calledMethodNames())
            doNextM, suspendM = trace.calledMethods
            self.assertEquals(((suspend,), {}), (doNextM.args, doNextM.kwargs))
            self.assertEquals(((), {}), (suspendM.args, suspendM.kwargs))
            trace.calledMethods.reset()
            return suspend

        suspend = getSuspend()
        suspend.resume(response='whatever')
        self.assertEquals(['whenDone'], trace.calledMethodNames())
        trace.calledMethods.reset()
        # Below no change of result, no side-effects
        self.assertEquals('whatever', suspend.getResult())
        try:
            suspend.resume(response='DIFFERENT')
        except AssertionError, e:
            self.assertEquals('Suspend already settled.', str(e))
开发者ID:seecr,项目名称:weightless-core,代码行数:26,代码来源:suspendtest.py


示例20: testRewrite

    def testRewrite(self):
        rewrite = MessageRewrite(fromMessage='this_message', toMessage='to_message')
        observer = CallTrace(emptyGeneratorMethods=['to_message'])
        tree = be((Observable(),
            (rewrite,
                (observer,),
            )
        ))
        consume(tree.all.this_message(aap='noot'))
        self.assertEqual(['to_message'], observer.calledMethodNames())
        self.assertEqual(dict(aap='noot'), observer.calledMethods[0].kwargs)

        observer.calledMethods.reset()
        consume(tree.any.this_message(aap='noot'))
        self.assertEqual(['to_message'], observer.calledMethodNames())
        self.assertEqual(dict(aap='noot'), observer.calledMethods[0].kwargs)

        del observer.emptyGeneratorMethods[:]
        observer.calledMethods.reset()
        tree.call.this_message(aap='noot')
        self.assertEqual(['to_message'], observer.calledMethodNames())
        self.assertEqual(dict(aap='noot'), observer.calledMethods[0].kwargs)

        observer.calledMethods.reset()
        tree.do.this_message(aap='noot')
        self.assertEqual(['to_message'], observer.calledMethodNames())
        self.assertEqual(dict(aap='noot'), observer.calledMethods[0].kwargs)
开发者ID:seecr,项目名称:meresco-components,代码行数:27,代码来源:messagerewritetest.py



注:本文中的seecr.test.CallTrace类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python test.SeecrTestCase类代码示例发布时间:2022-05-27
下一篇:
Python observate.load_filters函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap