• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python executor.execute函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中executor.execute函数的典型用法代码示例。如果您正苦于以下问题:Python execute函数的具体用法?Python execute怎么用?Python execute使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了execute函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: apt_supports_trusted_option

def apt_supports_trusted_option():
    """
    Since apt version 0.8.16~exp3 the option ``[trusted=yes]`` can be used in a
    ``sources.list`` file to disable GPG key checking (see `Debian bug
    #596498`_). This version of apt is included with Ubuntu 12.04 and later,
    but deb-pkg-tools also has to support older versions of apt. The
    :py:func:`apt_supports_trusted_option()` function checks if the installed
    version of apt supports the ``[trusted=yes]`` option, so that deb-pkg-tools
    can use it when possible.

    :returns: ``True`` if the option is supported, ``False`` if it is not.

    .. _Debian bug #596498: http://bugs.debian.org/cgi-bin/bugreport.cgi?bug=596498
    """
    global trusted_option_supported
    if trusted_option_supported is None:
        try:
            # Find the installed version of the `apt' package.
            version = execute('dpkg-query','--show', '--showformat=${Version}', 'apt', capture=True)
            # Check if the version is >= 0.8.16 (which includes [trusted=yes] support).
            execute('dpkg','--compare-versions', version, 'ge', '0.8.16~exp3')
            # If ExternalCommandFailed  is not raised,
            # `dpkg --compare-versions' reported succes.
            trusted_option_supported = True
        except ExternalCommandFailed:
            trusted_option_supported = False
    return trusted_option_supported
开发者ID:dsoprea,项目名称:python-deb-pkg-tools,代码行数:27,代码来源:repo.py


示例2: runInteractive

def runInteractive():
    """
    Main function for interactive mode
    """
    print "------------- Welcome to the Miner %s ----------------" % miner_version.version
    print "You can run HELP command anytime to get more information."
    print "Press TAB key for context base completion"
    print "    - F1  key to get context base HELP"
    print "    - Ctrl-H to get list of keyboard bindings"
    print "    - Ctrl-D to exit"
    while True:
        s = ""
        try:
            s = raw_input(">>> ")
        except KeyboardInterrupt:
            print
            continue
        except EOFError:
            break
        if not s: continue
        executor.execute(s)
        if statements.Import.checkIfWasModified():
            global theSymbolCompleter
            theSymbolCompleter = CompleterWrap()

    print "\nGoodbye"
开发者ID:afeset,项目名称:miner2-tools,代码行数:26,代码来源:interactive.py


示例3: runInteractive

def runInteractive():
    """
    Main function for interactive mode
    """
    print "------------- Welcome to the Miner %s ----------------" % miner_version.version
    print "You can run HELP command anytime to get more information."
    print "Press TAB key for context base completion"
    print "    - F1  key to get miner command help"
    print "    - F2  key to get python documentation"
    print "    - Ctrl-K to get list of keyboard bindings"
    print "    - Ctrl-D to exit"
    miner_globals.setIsInteractive(True)
    while True:
        s = ""
        try:
            s = raw_input(">>> ")
        except KeyboardInterrupt:
            print
            continue
        except EOFError:
            break
        if not s: continue
        executor.execute(s)

    print "\nGoodbye"
开发者ID:minersoft,项目名称:miner,代码行数:25,代码来源:interactive.py


示例4: wkhtml_to_pdf

    def wkhtml_to_pdf(cls, data, options=None):
        """
        Call wkhtmltopdf to convert the html to pdf
        """
        with tempfile.NamedTemporaryFile(
                suffix='.html', prefix='trytond_', delete=False
        ) as source_file:
            file_name = source_file.name
            source_file.write(data)
            source_file.close()

            # Evaluate argument to run with subprocess
            args = 'wkhtmltopdf'
            # Add Global Options
            if options:
                for option, value in options.items():
                    args += ' --%s' % option
                    if value:
                        args += ' "%s"' % value

            # Add source file name and output file name
            args += ' %s %s.pdf' % (file_name, file_name)
            # Execute the command using executor
            execute(args)
            return open(file_name + '.pdf').read()
开发者ID:openlabs,项目名称:trytond-report-html,代码行数:25,代码来源:__init__.py


示例5: test_subprocess_output

 def test_subprocess_output(self):
     self.assertEqual(execute('echo', 'this is a test', capture=True), 'this is a test')
     self.assertEqual(execute('echo', '-e', r'line 1\nline 2', capture=True), 'line 1\nline 2\n')
     # I don't know how to test for the effect of silent=True in a practical
     # way without creating the largest test in this test suite :-). The
     # least I can do is make sure the keyword argument is accepted and the
     # code runs without exceptions in supported environments.
     self.assertTrue(execute('echo', 'this is a test', silent=True))
开发者ID:maikelwever,项目名称:python-executor,代码行数:8,代码来源:tests.py


示例6: copy_package_files

def copy_package_files(from_directory, to_directory, hard_links=True):
    """
    Copy package files to a temporary directory, using hard links when possible.

    :param from_directory: The pathname of a directory tree suitable for
                           packaging with ``dpkg-deb --build``.
    :param to_directory: The pathname of a temporary build directory.
    :param hard_links: Use hard links to speed up copying when possible.

    This function copies a directory tree suitable for packaging with
    ``dpkg-deb --build`` to a temporary build directory so that individual
    files can be replaced without changing the original directory tree. If the
    build directory is on the same file system as the source directory, hard
    links are used to speed up the copy. This function is used by
    :func:`build_package()`.
    """
    logger.info("Copying files (%s) to temporary directory (%s) ..",
                format_path(from_directory), format_path(to_directory))
    command = ['cp', '-a']
    makedirs(to_directory)
    if hard_links and ALLOW_HARD_LINKS:
        # Check whether we can use hard links to speed up the copy. In the past
        # this used the following simple and obvious check:
        #
        #   os.stat(source_directory).st_dev == os.stat(build_directory).st_dev
        #
        # However this expression holds true inside schroot, yet `cp -al' fails
        # when trying to create the hard links! This is why the following code now
        # tries to create an actual hard link to verify that `cp -al' can be used.
        test_file_from = None
        test_file_to = None
        try:
            # Find a unique filename that we can create and destroy without
            # touching any of the caller's files.
            while True:
                test_name = 'deb-pkg-tools-hard-link-test-%d' % random.randint(1, 1000)
                test_file_from = os.path.join(from_directory, test_name)
                test_file_to = os.path.join(to_directory, test_name)
                if not os.path.isfile(test_file_from):
                    break
            # Create the test file.
            with open(test_file_from, 'w') as handle:
                handle.write('test')
            os.link(test_file_from, test_file_to)
            logger.debug("Speeding up file copy using hard links ..")
            command.append('-l')
        except (IOError, OSError):
            pass
        finally:
            for test_file in [test_file_from, test_file_to]:
                if test_file and os.path.isfile(test_file):
                    os.unlink(test_file)
    # I know this looks really funky, but this is a valid use of shell escaping
    # and globbing (obviously I tested it ;-).
    command.append('%s/*' % pipes.quote(from_directory))
    command.append(pipes.quote(to_directory))
    execute(' '.join(command), logger=logger)
开发者ID:ddboline,项目名称:python-deb-pkg-tools,代码行数:57,代码来源:package.py


示例7: execute

def execute(stockId=None, date=None):

	now = datetime.datetime.now()
	if date is None or len(date)==0:
        	#convert western calendar to R.O.C calendar
		date = str(now.year-1911) + "{:02d}".format(now.month) + "{:02d}".format(now.day)
	elif len(date)==4:
		date = str(now.year-1911) + date[0:2] + date[2:4]

	executor.execute(stockId, date)
开发者ID:distagon,项目名称:stocktw,代码行数:10,代码来源:stock.py


示例8: rotate_backups

    def rotate_backups(self, directory):
        """
        Rotate the backups in a directory according to a flexible rotation scheme.
        :param directory: The pathname of a directory that contains backups to
                          rotate (a string).

        .. note:: This function binds the main methods of the
                  :class:`RotateBackups` class together to implement backup
                  rotation with an easy to use Python API. If you're using
                  `rotate-backups` as a Python API and the default behavior is
                  not satisfactory, consider writing your own
                  :func:`rotate_backups()` function based on the underlying
                  :func:`collect_backups()`, :func:`group_backups()`,
                  :func:`apply_rotation_scheme()` and
                  :func:`find_preservation_criteria()` methods.
        """
        # Load configuration overrides by user?

        # Collect the backups in the given directory. if rotate type is on local or on google drive
        sorted_backups = self.collect_backups(directory, self.rotate_type)
        if not sorted_backups:
            logger.info("No backups found in %s.", self.custom_format_path(directory))
            return
        most_recent_backup = sorted_backups[-1]
        # Group the backups by the rotation frequencies.
        backups_by_frequency = self.group_backups(sorted_backups)
        # Apply the user defined rotation scheme.
        self.apply_rotation_scheme(backups_by_frequency, most_recent_backup.datetime)
        # Find which backups to preserve and why.
        backups_to_preserve = self.find_preservation_criteria(backups_by_frequency)
        # Apply the calculated rotation scheme.
        for backup in sorted_backups:
            if backup in backups_to_preserve:
                matching_periods = backups_to_preserve[backup]
                logger.info("Preserving %s (matches %s retention %s) ..",
                            self.custom_format_path(backup.pathname),
                            concatenate(map(repr, matching_periods)),
                            "period" if len(matching_periods) == 1 else "periods")
            else:
                logger.info("Deleting %s %s ..", backup.type, self.custom_format_path(backup.pathname))
                if not self.dry_run:
                    timer = Timer()
                    if self.rotate_type == 'local':  # if rotate type is on local or on google drive
                        command = ['rm', '-Rf', backup.pathname]
                        if self.io_scheduling_class:
                            command = ['ionice', '--class', self.io_scheduling_class] + command

                        execute(*command, logger=logger)
                    else:
                        self.gdrivecm.delete_file(backup.pathname.split('_')[0])
                    logger.debug("Deleted %s in %s.", self.custom_format_path(backup.pathname), timer)
        if len(backups_to_preserve) == len(sorted_backups):
            logger.info("Nothing to do! (all backups preserved)")
开发者ID:dacopan,项目名称:autobackup-dcm,代码行数:53,代码来源:rotate_dcm.py


示例9: generate_key_file

    def generate_key_file(self, filename):
        """
        Generate a temporary host or client key for the OpenSSH server.

        The :func:`start()` method automatically calls :func:`generate_key_file()`
        to generate :data:`host_key_file` and :attr:`client_key_file`. This
        method uses the ``ssh-keygen`` program to generate the keys.
        """
        if not os.path.isfile(filename):
            timer = Timer()
            self.logger.debug("Generating SSH key file (%s) ..", filename)
            execute('ssh-keygen', '-f', filename, '-N', '', '-t', 'rsa', silent=True, logger=self.logger)
            self.logger.debug("Generated key file %s in %s.", filename, timer)
开发者ID:xolox,项目名称:python-executor,代码行数:13,代码来源:server.py


示例10: test_status_code_checking

 def test_status_code_checking(self):
     self.assertTrue(execute('true'))
     self.assertFalse(execute('false', check=False))
     self.assertRaises(ExternalCommandFailed, execute, 'false')
     try:
         execute('bash', '-c', 'exit 42')
         # Make sure the previous line raised an exception.
         self.assertTrue(False)
     except Exception as e:
         # Make sure the expected type of exception was raised.
         self.assertTrue(isinstance(e, ExternalCommandFailed))
         # Make sure the exception has the expected properties.
         self.assertEqual(e.command, "bash -c 'exit 42'")
         self.assertEqual(e.returncode, 42)
开发者ID:maikelwever,项目名称:python-executor,代码行数:14,代码来源:tests.py


示例11: run

def run(args):
    if (len(args) == 0) or (len(args) % 2 == 1):
        print(messages.HELP_STRING)
        return
    iterator = iter(args)
    jobs = izip(iterator, iterator)
    for job_path, params_json in jobs:
        try:
            job = simplejson.loads(file_get_contents(job_path))
            validictory.validate(job, job_schema)
            params = simplejson.loads(params_json)
            execute(job, params)
        except Exception, error:
            print(error)
开发者ID:Andrey-Khobnya,项目名称:sql-jobs-util,代码行数:14,代码来源:runner.py


示例12: application

def application(request):
    """
    To use this application, the user must send a POST request with
    base64 or form encoded encoded HTML content and the wkhtmltopdf Options in
    request data, with keys 'base64_html' and 'options'.
    The application will return a response with the PDF file.
    """
    if request.method != 'POST':
        return

    request_is_json = request.content_type.endswith('json')

    with tempfile.NamedTemporaryFile(suffix='.html') as source_file:

        if request_is_json:
            # If a JSON payload is there, all data is in the payload
            payload = json.loads(request.data)
            source_file.write(payload['contents'].decode('base64'))
            options = payload.get('options', {})
        elif request.files:
            # First check if any files were uploaded
            source_file.write(request.files['file'].read())
            # Load any options that may have been provided in options
            options = json.loads(request.form.get('options', '{}'))

        source_file.flush()

        # Evaluate argument to run with subprocess
        args = ['wkhtmltopdf']

        # Add Global Options
        if options:
            for option, value in options.items():
                args.append('--%s' % option)
                if value:
                    args.append('"%s"' % value)

        # Add source file name and output file name
        file_name = source_file.name
        args += [file_name, file_name + ".pdf"]

        # Execute the command using executor
        execute(' '.join(args))

        return Response(
            wrap_file(request.environ, open(file_name + '.pdf')),
            mimetype='application/pdf',
        )
开发者ID:ddonng,项目名称:docker-wkhtmltopdf-aas,代码行数:48,代码来源:app.py


示例13: mktx

 def mktx(self, recip, amount):
     if self.iscoinset():
         command = self.coinbinary + " payto -f " + self.fee + " " + recip + " " + str(amount)
         output = execute(command, capture="True")
         return output
     else:
         return False
开发者ID:marzig76,项目名称:bitspread,代码行数:7,代码来源:electrumwrapper.py


示例14: find_system_dependencies

    def find_system_dependencies(self, shared_object_files):
        """
        (Ab)use dpkg-shlibdeps_ to find dependencies on system libraries.

        :param shared_object_files: The pathnames of the ``*.so`` file(s) contained
                                    in the package (a list of strings).
        :returns: A list of strings in the format of the entries on the
                  ``Depends:`` line of a binary package control file.

        .. _dpkg-shlibdeps: https://www.debian.org/doc/debian-policy/ch-sharedlibs.html#s-dpkg-shlibdeps
        """
        logger.debug("Abusing `dpkg-shlibdeps' to find dependencies on shared libraries ..")
        # Create a fake source package, because `dpkg-shlibdeps' expects this...
        with TemporaryDirectory(prefix='py2deb-dpkg-shlibdeps-') as fake_source_directory:
            # Create the debian/ directory expected in the source package directory.
            os.mkdir(os.path.join(fake_source_directory, 'debian'))
            # Create an empty debian/control file because `dpkg-shlibdeps' requires
            # this (even though it is apparently fine for the file to be empty ;-).
            open(os.path.join(fake_source_directory, 'debian', 'control'), 'w').close()
            # Run `dpkg-shlibdeps' inside the fake source package directory, but
            # let it analyze the *.so files from the actual build directory.
            command = ['dpkg-shlibdeps', '-O', '--warnings=0'] + shared_object_files
            output = execute(*command, directory=fake_source_directory, capture=True, logger=logger)
            expected_prefix = 'shlibs:Depends='
            if not output.startswith(expected_prefix):
                msg = ("The output of dpkg-shlibdeps doesn't match the"
                       " expected format! (expected prefix: %r, output: %r)")
                logger.warning(msg, expected_prefix, output)
                return []
            output = output[len(expected_prefix):]
            dependencies = sorted(dependency.strip() for dependency in output.split(','))
            logger.debug("Dependencies reported by dpkg-shlibdeps: %s", dependencies)
            return dependencies
开发者ID:anjith2006,项目名称:py2deb,代码行数:33,代码来源:package.py


示例15: daemon

 def daemon(self, control):
     if self.iscoinset() and (control == "start" or control == "stop"):
         command = self.coinbinary + " daemon " + control
         output = execute(command, capture="True")
         return True
     else:
         return False
开发者ID:marzig76,项目名称:bitspread,代码行数:7,代码来源:electrumwrapper.py


示例16: listaddresses

 def listaddresses(self):
     if self.iscoinset():
         command = self.coinbinary + " listaddresses"
         output = execute(command, capture="True")
         return output
     else:
         return False
开发者ID:marzig76,项目名称:bitspread,代码行数:7,代码来源:electrumwrapper.py


示例17: getaddresshistory

 def getaddresshistory(self, address):
     if self.iscoinset():
         command = self.coinbinary + " getaddresshistory " + address
         output = execute(command, capture="True")
         return output
     else:
         return False
开发者ID:marzig76,项目名称:bitspread,代码行数:7,代码来源:electrumwrapper.py


示例18: find_channels_of_guest

def find_channels_of_guest(guest_name):
    """
    Find the pathnames of the channels associated to a guest.

    :param guest_name: The name of the guest (a string).
    :returns: A dictionary with channel names (strings) as keys and pathnames
              of UNIX socket files (strings) as values. If no channels are
              detected an empty dictionary will be returned.

    This function uses ``virsh dumpxml`` and parses the XML output to
    determine the pathnames of the channels associated to the guest.
    """
    logger.debug("Discovering '%s' channels using 'virsh dumpxml' command ..", guest_name)
    domain_xml = execute('virsh', 'dumpxml', guest_name, capture=True)
    parsed_xml = xml.etree.ElementTree.fromstring(domain_xml)
    channels = {}
    for channel in parsed_xml.findall('devices/channel'):
        if channel.attrib.get('type') == 'unix':
            source = channel.find('source')
            target = channel.find('target')
            if source is not None and target is not None and target.attrib.get('type') == 'virtio':
                name = target.attrib.get('name')
                path = source.attrib.get('path')
                if name in SUPPORTED_CHANNEL_NAMES:
                    channels[name] = path
    if channels:
        logger.debug("Discovered '%s' channels: %s", guest_name, channels)
    else:
        logger.debug("No channels found for guest '%s'.", guest_name)
    return channels
开发者ID:xolox,项目名称:python-negotiator,代码行数:30,代码来源:__init__.py


示例19: broadcast

 def broadcast(self, tx):
     if self.iscoinset():
         command = self.coinbinary + " broadcast " + tx
         output = execute(command, capture="True")
         return True
     else:
         return False
开发者ID:marzig76,项目名称:bitspread,代码行数:7,代码来源:electrumwrapper.py


示例20: find_running_guests

def find_running_guests():
    """
    Find the names of the guests running on the current host.

    This function parses the output of the ``virsh list`` command instead of
    using the libvirt API because of two reasons:

    1. I'm under the impression that the libvirt API is still very much in flux
       and large changes are still being made, so it's not the most stable
       foundation for Negotiator to find running guests.

    2. The Python libvirt API needs to match the version of the libvirt API on
       the host system and there is AFAIK no obvious way to express this in the
       ``setup.py`` script of Negotiator.

    :returns: A generator of strings.
    :raises: :exc:`GuestDiscoveryError` when ``virsh list`` fails.
    """
    try:
        logger.debug("Discovering running guests using 'virsh list' command ..")
        output = execute('virsh', '--quiet', 'list', '--all', capture=True, logger=logger)
    except ExternalCommandFailed:
        raise GuestDiscoveryError("The 'virsh list' command failed! Are you sure libvirtd is running?")
    else:
        for line in output.splitlines():
            logger.debug("Parsing 'virsh list' output: %r", line)
            try:
                vm_id, vm_name, vm_status = line.split(None, 2)
                if vm_status == 'running':
                    yield vm_name
            except Exception:
                logger.warning("Failed to parse 'virsh list' output! (%r)", line)
开发者ID:xolox,项目名称:python-negotiator,代码行数:32,代码来源:__init__.py



注:本文中的executor.execute函数示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python executor.run函数代码示例发布时间:2022-05-24
下一篇:
Python executil.system函数代码示例发布时间:2022-05-24
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap