• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python select.devpoll函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中select.devpoll函数的典型用法代码示例。如果您正苦于以下问题:Python devpoll函数的具体用法?Python devpoll怎么用?Python devpoll使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了devpoll函数的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_events_mask_overflow_c_limits

 def test_events_mask_overflow_c_limits(self):
     from _testcapi import USHRT_MAX
     pollster = select.devpoll()
     w, r = os.pipe()
     pollster.register(w)
     # Issue #17919
     self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
     self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
开发者ID:5outh,项目名称:Databases-Fall2014,代码行数:8,代码来源:test_devpoll.py


示例2: test_devpoll

    def test_devpoll(self):
        poller = select.devpoll()
        self.addCleanup(poller.close)

        t0 = time.monotonic()
        poller.poll(self.sleep_time * 1e3)
        dt = time.monotonic() - t0
        self.stop_alarm()
        self.assertGreaterEqual(dt, self.sleep_time)
开发者ID:cpcloud,项目名称:cpython,代码行数:9,代码来源:eintr_tester.py


示例3: test_events_mask_overflow

 def test_events_mask_overflow(self):
     pollster = select.devpoll()
     w, r = os.pipe()
     pollster.register(w)
     # Issue #17919
     self.assertRaises(OverflowError, pollster.register, 0, -1)
     self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
     self.assertRaises(OverflowError, pollster.modify, 1, -1)
     self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
开发者ID:pykomke,项目名称:Kurz_Python_KE,代码行数:9,代码来源:test_devpoll.py


示例4: test_devpoll1

    def test_devpoll1(self):
        # Basic functional test of poll object
        # Create a bunch of pipe and test that poll works with them.

        p = select.devpoll()

        NUM_PIPES = 12
        MSG = b" This is a test."
        MSG_LEN = len(MSG)
        readers = []
        writers = []
        r2w = {}
        w2r = {}

        for i in range(NUM_PIPES):
            rd, wr = os.pipe()
            p.register(rd)
            p.modify(rd, select.POLLIN)
            p.register(wr, select.POLLOUT)
            readers.append(rd)
            writers.append(wr)
            r2w[rd] = wr
            w2r[wr] = rd

        bufs = []

        while writers:
            ready = p.poll()
            ready_writers = find_ready_matching(ready, select.POLLOUT)
            if not ready_writers:
                self.fail("no pipes ready for writing")
            wr = random.choice(ready_writers)
            os.write(wr, MSG)

            ready = p.poll()
            ready_readers = find_ready_matching(ready, select.POLLIN)
            if not ready_readers:
                self.fail("no pipes ready for reading")
            self.assertEqual([w2r[wr]], ready_readers)
            rd = ready_readers[0]
            buf = os.read(rd, MSG_LEN)
            self.assertEqual(len(buf), MSG_LEN)
            bufs.append(buf)
            os.close(r2w[rd]) ; os.close(rd)
            p.unregister(r2w[rd])
            p.unregister(rd)
            writers.remove(r2w[rd])

        self.assertEqual(bufs, [MSG] * NUM_PIPES)
开发者ID:pykomke,项目名称:Kurz_Python_KE,代码行数:49,代码来源:test_devpoll.py


示例5: test_timeout_overflow

    def test_timeout_overflow(self):
        pollster = select.devpoll()
        w, r = os.pipe()
        pollster.register(w)

        pollster.poll(-1)
        self.assertRaises(OverflowError, pollster.poll, -2)
        self.assertRaises(OverflowError, pollster.poll, -1 << 31)
        self.assertRaises(OverflowError, pollster.poll, -1 << 64)

        pollster.poll(0)
        pollster.poll(1)
        pollster.poll(1 << 30)
        self.assertRaises(OverflowError, pollster.poll, 1 << 31)
        self.assertRaises(OverflowError, pollster.poll, 1 << 63)
        self.assertRaises(OverflowError, pollster.poll, 1 << 64)
开发者ID:pykomke,项目名称:Kurz_Python_KE,代码行数:16,代码来源:test_devpoll.py


示例6: test_close

    def test_close(self):
        open_file = open(__file__, "rb")
        self.addCleanup(open_file.close)
        fd = open_file.fileno()
        devpoll = select.devpoll()

        # test fileno() method and closed attribute
        self.assertIsInstance(devpoll.fileno(), int)
        self.assertFalse(devpoll.closed)

        # test close()
        devpoll.close()
        self.assertTrue(devpoll.closed)
        self.assertRaises(ValueError, devpoll.fileno)

        # close() can be called more than once
        devpoll.close()

        # operations must fail with ValueError("I/O operation on closed ...")
        self.assertRaises(ValueError, devpoll.modify, fd, select.POLLIN)
        self.assertRaises(ValueError, devpoll.poll)
        self.assertRaises(ValueError, devpoll.register, fd, fd, select.POLLIN)
        self.assertRaises(ValueError, devpoll.unregister, fd)
开发者ID:5outh,项目名称:Databases-Fall2014,代码行数:23,代码来源:test_devpoll.py


示例7: __init__

 def __init__(self):
     super().__init__()
     self._devpoll = select.devpoll()
开发者ID:Alex9029,项目名称:cpython,代码行数:3,代码来源:selectors.py


示例8: __init__

 def __init__(self):
     super(DevpollSelector, self).__init__()
     self._devpoll = select.devpoll()
开发者ID:dpkp,项目名称:kafka-python,代码行数:3,代码来源:selectors34.py


示例9: test_fd_non_inheritable

 def test_fd_non_inheritable(self):
     devpoll = select.devpoll()
     self.addCleanup(devpoll.close)
     self.assertEqual(os.get_inheritable(devpoll.fileno()), False)
开发者ID:5outh,项目名称:Databases-Fall2014,代码行数:4,代码来源:test_devpoll.py


示例10: __init_poller

 def __init_poller(self):
     self.dp = select.devpoll()
开发者ID:ChrisCalderon,项目名称:Yashttpd,代码行数:2,代码来源:poller.py



注:本文中的select.devpoll函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python select.epoll函数代码示例发布时间:2022-05-27
下一篇:
Python helpers.Watcher类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap