• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python dirutil.safe_mkdtemp函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中twitter.common.dirutil.safe_mkdtemp函数的典型用法代码示例。如果您正苦于以下问题:Python safe_mkdtemp函数的具体用法?Python safe_mkdtemp怎么用?Python safe_mkdtemp使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了safe_mkdtemp函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: setUpClass

  def setUpClass(cls):
    cls.origin = safe_mkdtemp()
    with pushd(cls.origin):
      subprocess.check_call(['git', 'init', '--bare'])

    cls.gitdir = safe_mkdtemp()
    cls.worktree = safe_mkdtemp()

    cls.readme_file = os.path.join(cls.worktree, 'README')

    with environment_as(GIT_DIR=cls.gitdir, GIT_WORK_TREE=cls.worktree):
      cls.init_repo('depot', cls.origin)

      touch(cls.readme_file)
      subprocess.check_call(['git', 'add', 'README'])
      subprocess.check_call(['git', 'commit', '-am', 'initial commit with decode -> \x81b'])
      subprocess.check_call(['git', 'tag', 'first'])
      subprocess.check_call(['git', 'push', '--tags', 'depot', 'master'])
      subprocess.check_call(['git', 'branch', '--set-upstream', 'master', 'depot/master'])

      with safe_open(cls.readme_file, 'w') as readme:
        readme.write('Hello World.')
      subprocess.check_call(['git', 'commit', '-am', 'Update README.'])

    cls.clone2 = safe_mkdtemp()
    with pushd(cls.clone2):
      cls.init_repo('origin', cls.origin)
      subprocess.check_call(['git', 'pull', '--tags', 'origin', 'master:master'])

      with safe_open(os.path.realpath('README'), 'a') as readme:
        readme.write('--')
      subprocess.check_call(['git', 'commit', '-am', 'Update README 2.'])
      subprocess.check_call(['git', 'push', '--tags', 'origin', 'master'])

    cls.git = Git(gitdir=cls.gitdir, worktree=cls.worktree)
开发者ID:aoen,项目名称:pants,代码行数:35,代码来源:test_git.py


示例2: set_up_mocks

  def set_up_mocks(self, su=None):
    self.mox.StubOutWithMock(dirutil, 'safe_mkdtemp')
    dirutil.safe_mkdtemp().AndReturn('/tmp/test')
    self.mox.StubOutWithMock(log, 'init')
    log.init('/tmp/test/current_run').AndReturn(0)

    self.mox.StubOutWithMock(CommandUtil, 'execute_and_get_output')
    stub = CommandUtil.execute_and_get_output(['git','remote', '-v'])
    stub.AndReturn((0, dedent("""origin  https://git.twitter.biz/science (fetch)
    origin  https://git.twitter.biz/science (push)""")))
    stub2 = CommandUtil.execute_and_get_output(['git','rev-parse', '--abbrev-ref', 'HEAD'])
    stub2.AndReturn((0,"test_br"))

    self.mox.StubOutWithMock(psutil, 'cpu_percent')
    psutil.cpu_percent(interval=1).AndReturn(1.0)
    self.mox.StubOutWithMock(psutil, 'network_io_counters')
    psutil.network_io_counters().AndReturn("1000,10000,1000")
    self.mox.StubOutWithMock(psutil, 'NUM_CPUS')
    psutil.NUM_CPUS = 5

    self.mox.StubOutWithMock(socket, 'gethostname')
    socket.gethostname().AndReturn("localhost")
    self.mox.StubOutWithMock(socket, 'gethostbyname')
    socket.gethostbyname("localhost").AndReturn("localhost")

    self.mox.StubOutWithMock(sys, 'exit')
    sys.exit(0).AndReturn(0)
    self.mox.ReplayAll()
开发者ID:benhuang-zh,项目名称:commons,代码行数:28,代码来源:buildtimestats_test.py


示例3: __init__

 def __init__(
     self,
     pex_location,
     checkpoint_root,
     artifact_dir=None,
     task_runner_class=ThermosTaskRunner,
     max_wait=Amount(1, Time.MINUTES),
     preemption_wait=Amount(1, Time.MINUTES),
     poll_interval=Amount(500, Time.MILLISECONDS),
     clock=time,
     process_logger_mode=None,
     rotate_log_size_mb=None,
     rotate_log_backups=None,
 ):
     self._artifact_dir = artifact_dir or safe_mkdtemp()
     self._checkpoint_root = checkpoint_root
     self._clock = clock
     self._max_wait = max_wait
     self._pex_location = pex_location
     self._poll_interval = poll_interval
     self._preemption_wait = preemption_wait
     self._task_runner_class = task_runner_class
     self._process_logger_mode = process_logger_mode
     self._rotate_log_size_mb = rotate_log_size_mb
     self._rotate_log_backups = rotate_log_backups
开发者ID:mohammadsamir,项目名称:aurora,代码行数:25,代码来源:thermos_task_runner.py


示例4: make_distribution

def make_distribution(name='my_project', zipped=False, zip_safe=True):
  interp = {'project_name': name}
  if zip_safe:
    interp['content'] = dedent('''
    def do_something():
      print('hello world!')
    ''')
  else:
    interp['content'] = dedent('''
    if __file__ == 'derp.py':
      print('i am an idiot')
    ''')
  with temporary_content(PROJECT_CONTENT, interp=interp) as td:
    installer = Installer(td)
    distribution = installer.distribution()
    distiller = Distiller(distribution, debug=True)
    dist_location = distiller.distill(into=safe_mkdtemp())
    if zipped:
      yield DistributionHelper.distribution_from_path(dist_location)
    else:
      with temporary_dir() as td:
        extract_path = os.path.join(td, os.path.basename(dist_location))
        with contextlib.closing(zipfile.ZipFile(dist_location)) as zf:
          zf.extractall(extract_path)
        yield DistributionHelper.distribution_from_path(extract_path)
开发者ID:CodeWarltz,项目名称:commons,代码行数:25,代码来源:test_common.py


示例5: __init__

  def __init__(self,
               runner_pex,
               task_id,
               task,
               role,
               portmap,
               sandbox,
               checkpoint_root,
               artifact_dir=None,
               clock=time,
               hostname=None,
               process_logger_destination=None,
               process_logger_mode=None,
               rotate_log_size_mb=None,
               rotate_log_backups=None,
               preserve_env=False):
    """
      runner_pex       location of the thermos_runner pex that this task runner should use
      task_id          task_id assigned by scheduler
      task             thermos pystachio Task object
      role             role to run the task under
      portmap          { name => port } dictionary
      sandbox          the sandbox object
      checkpoint_root  the checkpoint root for the thermos runner
      artifact_dir     scratch space for the thermos runner (basically cwd of thermos.pex)
      clock            clock
      preserve_env
    """
    self._runner_pex = runner_pex
    self._task_id = task_id
    self._task = task
    self._popen, self._popen_signal, self._popen_rc = None, None, None
    self._monitor = None
    self._status = None
    self._ports = portmap
    self._root = sandbox.root
    self._checkpoint_root = checkpoint_root
    self._enable_chroot = sandbox.chrooted
    self._preserve_env = preserve_env
    self._role = role
    self._clock = clock
    self._artifact_dir = artifact_dir or safe_mkdtemp()
    self._hostname = hostname or socket.gethostname()
    self._process_logger_destination = process_logger_destination
    self._process_logger_mode = process_logger_mode
    self._rotate_log_size_mb = rotate_log_size_mb
    self._rotate_log_backups = rotate_log_backups

    # wait events
    self._dead = threading.Event()
    self._kill_signal = threading.Event()
    self.forking = threading.Event()
    self.forked = threading.Event()

    try:
      with open(os.path.join(self._artifact_dir, 'task.json'), 'w') as fp:
        self._task_filename = fp.name
        ThermosTaskWrapper(self._task).to_file(self._task_filename)
    except ThermosTaskWrapper.InvalidTask as e:
      raise TaskError('Failed to load task: %s' % e)
开发者ID:BruceZu,项目名称:aurora,代码行数:60,代码来源:thermos_task_runner.py


示例6: __init__

 def __init__(self, cache=None, failsoft=True, clock=time, opener=None):
     self._failsoft = failsoft
     self._cache = cache or safe_mkdtemp()
     safe_mkdir(self._cache)
     self._clock = clock
     self._opener = opener or Web()
     super(CachedWeb, self).__init__()
开发者ID:jalons,项目名称:commons,代码行数:7,代码来源:http.py


示例7: test_launchTask_deserialization_fail

    def test_launchTask_deserialization_fail(self):  # noqa
        proxy_driver = ProxyDriver()

        role = getpass.getuser()
        task_info = mesos_pb2.TaskInfo()
        task_info.name = task_info.task_id.value = "broken"
        task_info.data = serialize(
            AssignedTask(
                task=TaskConfig(
                    job=JobKey(role=role, environment="env", name="name"),
                    owner=Identity(role=role, user=role),
                    executorConfig=ExecutorConfig(name=AURORA_EXECUTOR_NAME, data="garbage"),
                )
            )
        )

        te = FastThermosExecutor(
            runner_provider=make_provider(safe_mkdtemp()), sandbox_provider=DefaultTestSandboxProvider()
        )
        te.launchTask(proxy_driver, task_info)
        proxy_driver.wait_stopped()

        updates = proxy_driver.method_calls["sendStatusUpdate"]
        assert len(updates) == 2
        assert updates[0][0][0].state == mesos_pb2.TASK_STARTING
        assert updates[1][0][0].state == mesos_pb2.TASK_FAILED
开发者ID:rosmo,项目名称:aurora,代码行数:26,代码来源:test_thermos_executor.py


示例8: test_scheduler_runs

def test_scheduler_runs():
  """
    Verifies that the scheduler successfully launches 3 "no-op" MySQL tasks.
    NOTE: Due to the limitation of zake the scheduler's ZK operations are not propagated to
    executors in separate processes but they are unit-tested separately.
  """
  import mesos.native

  # Make sure fake_mysos_executor.pex is available to be fetched by Mesos slave.
  assert os.path.isfile('dist/fake_mysos_executor.pex')

  storage = FakeStorage(SequentialThreadingHandler())
  zk_client = FakeClient(storage=storage)
  zk_client.start()

  zk_url = "zk://fake_host/home/mysos/clusters"
  cluster_name = "test_cluster"
  num_nodes = 3

  state_provider = LocalStateProvider(safe_mkdtemp())

  framework_info = FrameworkInfo(
      user=getpass.getuser(),
      name="mysos",
      checkpoint=False)

  state = Scheduler(framework_info)

  scheduler = MysosScheduler(
      state,
      state_provider,
      getpass.getuser(),
      os.path.abspath("dist/fake_mysos_executor.pex"),
      "./fake_mysos_executor.pex",
      zk_client,
      zk_url,
      Amount(40, Time.SECONDS),
      "/fakepath",
      gen_encryption_key())

  scheduler_driver = mesos.native.MesosSchedulerDriver(
      scheduler,
      framework_info,
      "local")
  scheduler_driver.start()

  # Wait until the scheduler is connected and becomes available.
  assert scheduler.connected.wait(30)

  scheduler.create_cluster(cluster_name, "mysql_user", num_nodes)

  # A slave is promoted to be the master.
  deadline(
      lambda: wait_for_master(
          get_cluster_path(posixpath.join(zk_url, 'discover'), cluster_name),
          zk_client),
      Amount(40, Time.SECONDS))

  assert scheduler_driver.stop() == DRIVER_STOPPED
开发者ID:GavinHwa,项目名称:mysos,代码行数:59,代码来源:test_mysos_scheduler.py


示例9: make_fileset

def make_fileset(filelist, piece_size, fs=DISK):
  "Given (filename, contents) list, return dir, FileSet pair."
  td = safe_mkdtemp()
  for filename, contents in filelist:
    sl = Fileslice(os.path.join(td, filename), slice(0, len(contents)))
    fs.fill(sl)
    fs.write(sl, contents)
  filelist = [(filename, len(contents)) for (filename, contents) in filelist]
  return td, FileSet(filelist, piece_size)
开发者ID:pombredanne,项目名称:rainman,代码行数:9,代码来源:testing.py


示例10: __init__

 def __init__(self, fileset, piece_hashes=None, chroot=None, fs=DISK):
   self._fileset = fileset
   self._pieces = piece_hashes or [b'\x00' * 20] * self._fileset.num_pieces
   self._actual_pieces = []
   self._fileset = fileset
   self._sliceset = SliceSet()
   self._chroot = chroot or safe_mkdtemp()
   self._fs = fs
   safe_mkdir(self._chroot)
开发者ID:pombredanne,项目名称:rainman,代码行数:9,代码来源:piece_manager.py


示例11: __init__

 def __init__(self, host, port, endpoint, max_delay, stats_file, user, force_stats_upload=False):
   self.force_stats_upload = force_stats_upload
   self._stats_log_dir = dirutil.safe_mkdtemp()
   self._stats_log_file = os.path.join(self._stats_log_dir, "current_run")
   log.init(self._stats_log_file)
   self._stats_dir = os.path.join("/tmp", user, "stats_uploader_dir")
   self._stats_http_client = StatsHttpClient(host, port, endpoint, self._stats_dir)
   self._max_delay = max_delay
   self._pants_stat_file = stats_file
   self._user = user
开发者ID:BabyDuncan,项目名称:commons,代码行数:10,代码来源:stats_http_client.py


示例12: test_inotify_diskcollector

def test_inotify_diskcollector():
  target = safe_mkdtemp()
  INTERVAL = Amount(50, Time.MILLISECONDS)
  collector = InotifyDiskCollector(target)
  collector._thread.COLLECTION_INTERVAL = INTERVAL

  def wait():
    time.sleep((2 * INTERVAL).as_(Time.SECONDS))

  _run_collector_tests(collector, target, wait)
开发者ID:bhuvan,项目名称:incubator-aurora,代码行数:10,代码来源:test_disk.py


示例13: thermos_runner_path

def thermos_runner_path(build=True):
  if not build:
    return getattr(thermos_runner_path, 'value', None)

  if not hasattr(thermos_runner_path, 'value'):
    pex_dir = safe_mkdtemp()
    assert subprocess.call(["./pants", "--pants-distdir=%s" % pex_dir, "binary",
      "src/main/python/apache/thermos/runner:thermos_runner"]) == 0
    thermos_runner_path.value = os.path.join(pex_dir, 'thermos_runner.pex')
  return thermos_runner_path.value
开发者ID:bmhatfield,项目名称:aurora,代码行数:10,代码来源:test_thermos_executor.py


示例14: test_du_diskcollector

def test_du_diskcollector():
  target = safe_mkdtemp()
  collector = DiskCollector(target)

  def wait():
    collector.sample()
    if collector._thread is not None:
      collector._thread.event.wait()

  _run_collector_tests(collector, target, wait)
开发者ID:bhuvan,项目名称:incubator-aurora,代码行数:10,代码来源:test_disk.py


示例15: _unpack

 def _unpack(self, filename, location=None):
   """Unpack this source target into the path if supplied.  If the path is not supplied, a
      temporary directory will be created."""
   path = location or safe_mkdtemp()
   archive_class, error_class = self._archive_class
   try:
     with contextlib.closing(archive_class(filename)) as package:
       package.extractall(path=path)
   except error_class:
     raise self.UnreadableLink('Could not read %s' % self.url)
   return self.first_nontrivial_dir(path)
开发者ID:BabyDuncan,项目名称:commons,代码行数:11,代码来源:link.py


示例16: fetch

 def fetch(self, location=None, conn_timeout=None):
   if self.local and location is None:
     return self._url.path
   location = location or safe_mkdtemp()
   target = os.path.join(location, self.filename)
   if os.path.exists(target):
     return target
   with contextlib.closing(self.fh(conn_timeout=conn_timeout)) as url_fp:
     safe_mkdir(os.path.dirname(target))
     with open(target, 'wb') as fp:
       fp.write(url_fp.read())
   return target
开发者ID:BabyDuncan,项目名称:commons,代码行数:12,代码来源:link.py


示例17: create_run_tracker

def create_run_tracker(info_dir=None):
  """Creates a ``RunTracker`` and starts it.

  :param string info_dir: An optional director for the run tracker to store state; defaults to a
    new temp dir that will be be cleaned up on interpreter exit.
  """
  # TODO(John Sirois): Rework uses around a context manager for cleanup of the info_dir in a more
  # disciplined manner
  info_dir = info_dir or safe_mkdtemp()
  run_tracker = RunTracker(info_dir)
  report = Report()
  run_tracker.start(report)
  return run_tracker
开发者ID:CodeWarltz,项目名称:commons,代码行数:13,代码来源:context_utils.py


示例18: __init__

  def __init__(self, source_dir, strict=True):
    """
      Create an installer from an unpacked source distribution in source_dir.

      If strict=True, fail if any installation dependencies (e.g. distribute)
      are missing.
    """
    self._source_dir = source_dir
    self._install_tmp = safe_mkdtemp()
    self._installed = None
    self._strict = strict
    fd, self._install_record = tempfile.mkstemp()
    os.close(fd)
开发者ID:BabyDuncan,项目名称:commons,代码行数:13,代码来源:installer.py


示例19: test_mkdtemp_setup_teardown

def test_mkdtemp_setup_teardown():
  m = mox.Mox()

  def faux_cleaner():
    pass

  DIR1, DIR2 = 'fake_dir1__does_not_exist', 'fake_dir2__does_not_exist'
  m.StubOutWithMock(atexit, 'register')
  m.StubOutWithMock(os, 'getpid')
  m.StubOutWithMock(tempfile, 'mkdtemp')
  m.StubOutWithMock(dirutil, 'safe_rmtree')
  atexit.register(faux_cleaner) # ensure only called once
  tempfile.mkdtemp(dir='1').AndReturn(DIR1)
  tempfile.mkdtemp(dir='2').AndReturn(DIR2)
  os.getpid().MultipleTimes().AndReturn('unicorn')
  dirutil.safe_rmtree(DIR1)
  dirutil.safe_rmtree(DIR2)
  # make sure other "pids" are not cleaned
  dirutil._MKDTEMP_DIRS['fluffypants'].add('yoyo')

  try:
    m.ReplayAll()
    assert dirutil.safe_mkdtemp(dir='1', cleaner=faux_cleaner) == DIR1
    assert dirutil.safe_mkdtemp(dir='2', cleaner=faux_cleaner) == DIR2
    assert 'unicorn' in dirutil._MKDTEMP_DIRS
    assert dirutil._MKDTEMP_DIRS['unicorn'] == set([DIR1, DIR2])
    dirutil._mkdtemp_atexit_cleaner()
    assert 'unicorn' not in dirutil._MKDTEMP_DIRS
    assert dirutil._MKDTEMP_DIRS['fluffypants'] == set(['yoyo'])

  finally:
    dirutil._MKDTEMP_DIRS.pop('unicorn', None)
    dirutil._MKDTEMP_DIRS.pop('fluffypants', None)
    dirutil._mkdtemp_unregister_cleaner()

    m.UnsetStubs()
    m.VerifyAll()
开发者ID:BabyDuncan,项目名称:commons,代码行数:37,代码来源:test_dirutil.py


示例20: __init__

 def __init__(self, peer_id, chroot=None, io_loop=None, session_impl=Session, fs=DISK):
     self.peer_id = peer_id
     self._ip = socket.gethostbyname(socket.gethostname())
     self._chroot = chroot or safe_mkdtemp()
     safe_mkdir(self._chroot)
     self._torrents = {}  # map from handshake prefix => Torrent
     self._trackers = {}  # map from handshake prefix => PeerTracker
     self._sessions = {}  # map from handshake prefix => Session
     self._piece_brokers = {}  # map from handshake prefix => PieceBroker
     self._failed_handshakes = 0
     self._port = None
     self._session_impl = session_impl
     self._fs = fs  # this should probably be broker_impl
     self._peer_callback = self.default_peer_callback
     super(Client, self).__init__(io_loop=io_loop)
开发者ID:pombredanne,项目名称:rainman,代码行数:15,代码来源:client.py



注:本文中的twitter.common.dirutil.safe_mkdtemp函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python dirutil.safe_open函数代码示例发布时间:2022-05-27
下一篇:
Python dirutil.safe_mkdir函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap