• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python ustruct.unpack函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中ustruct.unpack函数的典型用法代码示例。如果您正苦于以下问题:Python unpack函数的具体用法?Python unpack怎么用?Python unpack使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了unpack函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: parse_bundle

def parse_bundle(bundle, strict=False):
    """Parse a binary OSC bundle.

    Returns a generator which walks over all contained messages and bundles
    recursively, depth-first. Each item yielded is a (timetag, message) tuple.

    """
    if not bundle.startswith(b'#bundle\0'):
        raise TypeError("Bundle must start with '#bundle\\0'.")

    ofs = 16
    timetag = to_time(*unpack('>II', bundle[8:ofs]))

    while True:
        if ofs >= len(bundle):
            break

        size = unpack('>I', bundle[ofs:ofs + 4])[0]
        element = bundle[ofs + 4:ofs + 4 + size]
        ofs += size + 4

        if element.startswith('#bundle'):
            for el in parse_bundle(element):
                yield el
        else:
            yield timetag, parse_message(element, strict)
开发者ID:SpotlightKid,项目名称:micropython-osc,代码行数:26,代码来源:server.py


示例2: _handle_suback

    def _handle_suback(self):
        print("_handle_suback") #needed after _packet_handle
        pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's'
        (mid, packet) = struct.unpack(pack_format, self._in_packet['packet'])
        pack_format = "!" + "B"*len(packet)
        granted_qos = struct.unpack(pack_format, packet)

        if self.on_subscribe:
            self.on_subscribe(self, self._userdata, mid, granted_qos)

        return MQTT_ERR_SUCCESS
开发者ID:slzatz,项目名称:miscellaneous,代码行数:11,代码来源:umqtt3.py


示例3: parse_message

def parse_message(msg, strict=False):
    args = []
    addr, ofs = split_oscstr(msg, 0)

    if not addr.startswith('/'):
        raise ValueError("OSC address pattern must start with a slash.")

    # type tag string must start with comma (ASCII 44)
    if ofs < len(msg) and msg[ofs] == 44:
        tags, ofs = split_oscstr(msg, ofs)
        tags = tags[1:]
    else:
        msg = "Missing/invalid OSC type tag string."
        if strict:
            raise ValueError(msg)
        else:
            log.warning(msg + ' Ignoring arguments.')
            tags = ''

    for typetag in tags:
        size = 0

        if typetag in 'ifd':
            size = 8 if typetag == 'd' else 4
            args.append(unpack('>' + typetag, msg[ofs:ofs + size])[0])
        elif typetag in 'sS':
            s, ofs = split_oscstr(msg, ofs)
            args.append(s)
        elif typetag == 'b':
            s, ofs = split_oscblob(msg, ofs)
            args.append(s)
        elif typetag in 'rm':
            size = 4
            args.append(unpack('BBBB', msg[ofs:ofs + size]))
        elif typetag == 'c':
            size = 4
            args.append(chr(unpack('>I', msg[ofs:ofs + size])[0]))
        elif typetag == 'h':
            size = 8
            args.append(unpack('>q', msg[ofs:ofs + size])[0])
        elif typetag == 't':
            size = 8
            args.append(parse_timetag(msg, ofs))
        elif typetag in 'TFNI':
            args.append({'T': True, 'F': False, 'I': Impulse}.get(typetag))
        else:
            raise ValueError("Type tag '%s' not supported." % typetag)

        ofs += size

    return (addr, tags, tuple(args))
开发者ID:SpotlightKid,项目名称:micropython-osc,代码行数:51,代码来源:server.py


示例4: main

def main():
    global initrgbdata
    global ARTNET_header
    global sock

    while True:
        # initrgbdata = initrgbdata[1:] + initrgbdata[0:1]
        # print(initrgbdata)
        # time.sleep_ms(25)
        chain.show(initrgbdata)

        # print("waiting to receive message")
        # sn = sock.getsockname()
        try:
            data = sock.recv(530)
            # print("received data: ", len(data))
        except:
            # print("no data")
            continue

        # print "received bytes: ", len(data)

        if data[0:7] != ARTNET_header:
            # print("no artnet packet")
            continue
        # else:
        #     print("found artnet packet")

        opcode = ustruct.unpack('H', data[8:10])[0]
        # print(hex(opcode))

        if opcode == 0x5000:
            data_length = ustruct.unpack('>H', data[16:18])[0]
            # print("data length: ", data_length)
            rgbdata = []
            ndx = 0
            while ndx < data_length and ndx < CHAIN_LEN * 3:
                rgb = [0, 0, 0]
                i = 0
                while ndx+i < data_length and i < 3:
                    rgb[i] = int(data[18+ndx+i])
                    # print(str(data[18+ndx+i]))
                    i += 1
                rgbdata.append(tuple(rgb))
                ndx += 3
            # chain.show(rgbdata[0:CHAIN_LEN-1])
            # print(rgbdata[0])
            initrgbdata = rgbdata[0:CHAIN_LEN]
开发者ID:bennigraf,项目名称:wipy-artnet-to-ws2812,代码行数:48,代码来源:main.py


示例5: time

def time(ms_accuracy=False):
    NTP_QUERY = bytearray(48)
    NTP_QUERY[0] = 0x1b
    addr = socket.getaddrinfo(host, 123)[0][-1]
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.settimeout(1)

    res = s.sendto(NTP_QUERY, addr)
    msg = s.recv(48)
    s.close()

    sec = struct.unpack("!I", msg[40:44])[0] - NTP_DELTA
    frac = struct.unpack("!I", msg[44:48])[0] * MILLIS_PER_SECOND >> 32
    if ms_accuracy:
        return sec, frac
    return sec
开发者ID:puhitaku,项目名称:AutoChime,代码行数:16,代码来源:ntptime_kai.py


示例6: rd16

	def rd16( self, addr ):
		""" read a word (16 bits) @ addr """
		# Status: Certified!
		self.__start( addr )
		_bytes = self.spi.read( 2 ) # read 2 bytes
		self.__end()
		return ustruct.unpack( '>H', _bytes )[0] # was initialy '<H' ?
开发者ID:mchobby,项目名称:pyboard-driver,代码行数:7,代码来源:gd.py


示例7: _handle_connack

    def _handle_connack(self):
        print("_handle_connack") #needed

        if len(self._in_packet['packet']) != 2:
            return MQTT_ERR_PROTOCOL

        (flags, result) = struct.unpack("!BB", self._in_packet['packet'])
        # Do now support downgrade to MQTT v3.1

        if result == 0:
            self._state = mqtt_cs_connected
            print("_handle_connack: self._state =", self._state)

        if self.on_connect:
            flags_dict = dict()
            flags_dict['session present'] = flags & 0x01
            self.on_connect(self, self._userdata, flags_dict, result)

        if result == 0:
            rc = 0
            return rc
        elif result > 0 and result < 6:
            return MQTT_ERR_CONN_REFUSED
        else:
            return MQTT_ERR_PROTOCOL
开发者ID:slzatz,项目名称:miscellaneous,代码行数:25,代码来源:umqtt3.py


示例8: _register_short

    def _register_short(self, register, value=None, buf=bytearray(2)):
        if value is None:
            self.i2c.readfrom_mem_into(self.address, register, buf)
            return ustruct.unpack("<h", buf)[0]

        ustruct.pack_into("<h", buf, 0, value)
        return self.i2c.writeto_mem(self.address, register, buf)
开发者ID:radiumray,项目名称:mdxly,代码行数:7,代码来源:mpu9250.py


示例9: altitude

 def altitude(self):
     """Read the altitude as calculated based on the sensor pressure and
     previously configured pressure at sea-level.  This will return a
     value in meters.  Set the sea-level pressure by updating the
     sealevel_pressure property first to get a more accurate altitude value.
     """
     # First poll for a measurement to be finished.
     self._poll_reg1(_MPL3115A2_CTRL_REG1_OST)
     # Set control bits for pressure reading.
     self._ctrl_reg1 |= 0b10000000  # Turn on bit 0, ALT.
     self._write_u8(_MPL3115A2_CTRL_REG1, self._ctrl_reg1)
     self._ctrl_reg1 |= 0b00000010   # Set OST to 1 to start measurement.
     self._write_u8(_MPL3115A2_CTRL_REG1, self._ctrl_reg1)
     # Poll status for PDR to be set.
     while self._read_u8(_MPL3115A2_REGISTER_STATUS) & _MPL3115A2_REGISTER_STATUS_PDR == 0:
         time.sleep(0.01)
     # Read 3 bytes of altitude data into buffer.
     # Yes even though this is the address of the pressure register it
     # returns altitude when the ALT bit is set above.
     self._read_into(_MPL3115A2_REGISTER_PRESSURE_MSB, self._BUFFER, count=3)
     # Reconstruct signed 32-bit altitude value (actually 24 bits shifted up
     # and then scaled down).
     self._BUFFER[3] = 0  # Top 3 bytes of buffer were read from the chip.
     altitude = struct.unpack('>i', self._BUFFER[0:4])[0]
     # Scale down to meters.
     return altitude / 65535.0
开发者ID:eiselekd,项目名称:hw,代码行数:26,代码来源:adafruit_mpl3115a2.py


示例10: minmax

def minmax():
  global offset
  global rate
  reg = register
  min_temp = 1000
  max_temp = -1000
  for i in range(0, 64):
    val = ustruct.unpack('<h', i2c.readfrom_mem(i2c_address, reg, 2))[0]
    tmp = getval(val)
    if tmp < min_temp:
      min_temp = tmp
    if max_temp < tmp:
      max_temp = tmp
    reg += 2

  diff = max_temp - min_temp
  # add some margin
  diff *= 1.4
  rate = len(colors) / diff
  offset = min_temp * 0.8

  lcd.clear()
  lcd.print('min:    ' + '{:.2f}'.format(min_temp), 0, 0)
  lcd.print('max:    ' + '{:.2f}'.format(max_temp), 0, 10)
  lcd.print('rate:   ' + '{:.2f}'.format(rate), 0, 20)
  lcd.print('offset: ' + '{:.2f}'.format(offset), 0, 30)
开发者ID:SWITCHSCIENCE,项目名称:samplecodes,代码行数:26,代码来源:thermography.py


示例11: getpwnam

def getpwnam(user):
    passwd = getpwnam_(user)
    if not passwd:
        raise KeyError("getpwnam(): name not found: {}".format(user))
    passwd_fmt = "SSIISSS"
    passwd = uctypes.bytes_at(passwd, ustruct.calcsize(passwd_fmt))
    passwd = ustruct.unpack(passwd_fmt, passwd)
    return struct_passwd(*passwd)
开发者ID:SpotlightKid,项目名称:micropython-lib,代码行数:8,代码来源:pwd.py


示例12: read

 def read(self):
     # Return current mode measurement result in lx.
     # needs to be on and mode set first   
     data = unpack('>H', self.bus.readfrom(self.addr, 2))[0]
     count = data >> 8 | (data&0xff)<<8
     mode2coeff =  2 if (self.mode & 0x03) == 0x01 else 1
     ratio = 1/(1.2 * (self._sensitivity/69.0) * mode2coeff)
     return ratio*count
开发者ID:PinkInk,项目名称:upylib,代码行数:8,代码来源:__init__.py


示例13: draw_char

 def draw_char(self,ch,x,y,*args,**kwargs):
  if x<-self._font_width or x>=self._width or y<-self._font_height or y>=self._height:
   return
  for char_x in range(self._font_width):
   self._font.seek(2+(ord(ch)*self._font_width)+char_x)
   line=ustruct.unpack('B',self._font.read(1))[0]
   for char_y in range(self._font_height):
    if(line>>char_y)&0x1:
     self._pixel(x+char_x,y+char_y,*args,**kwargs)
开发者ID:radiumray,项目名称:mdxly,代码行数:9,代码来源:matrix.py


示例14: master

def master():
    csn = Pin(cfg['csn'], mode=Pin.OUT, value=1)
    ce = Pin(cfg['ce'], mode=Pin.OUT, value=0)
    if cfg['spi'] == -1:
        spi = SPI(-1, sck=Pin(cfg['sck']), mosi=Pin(cfg['mosi']), miso=Pin(cfg['miso']))
        nrf = NRF24L01(spi, csn, ce, payload_size=8)
    else:
        nrf = NRF24L01(SPI(cfg['spi']), csn, ce, payload_size=8)

    nrf.open_tx_pipe(pipes[0])
    nrf.open_rx_pipe(1, pipes[1])
    nrf.start_listening()

    num_needed = 16
    num_successes = 0
    num_failures = 0
    led_state = 0

    print('NRF24L01 master mode, sending %d packets...' % num_needed)

    while num_successes < num_needed and num_failures < num_needed:
        # stop listening and send packet
        nrf.stop_listening()
        millis = utime.ticks_ms()
        led_state = max(1, (led_state << 1) & 0x0f)
        print('sending:', millis, led_state)
        try:
            nrf.send(struct.pack('ii', millis, led_state))
        except OSError:
            pass

        # start listening again
        nrf.start_listening()

        # wait for response, with 250ms timeout
        start_time = utime.ticks_ms()
        timeout = False
        while not nrf.any() and not timeout:
            if utime.ticks_diff(utime.ticks_ms(), start_time) > 250:
                timeout = True

        if timeout:
            print('failed, response timed out')
            num_failures += 1

        else:
            # recv packet
            got_millis, = struct.unpack('i', nrf.recv())

            # print response and round-trip delay
            print('got response:', got_millis, '(delay', utime.ticks_diff(utime.ticks_ms(), got_millis), 'ms)')
            num_successes += 1

        # delay then loop
        utime.sleep_ms(250)

    print('master finished sending; successes=%d, failures=%d' % (num_successes, num_failures))
开发者ID:DanielO,项目名称:micropython,代码行数:57,代码来源:nrf24l01test.py


示例15: _handle_publish

    def _handle_publish(self):
        rc = 0
        print("_handle_publish") #needed after packet_handle
        header = self._in_packet['command']
        #!H = ! means Network Byte Order, Size, and Alignment with H means unsigned short 2 bytes
        # For the 's' format character, the count is interpreted as the length of the bytes, not a repeat count like for the other format characters; for example, '10s' means a single 10-byte string
        # return  mtPacket(0b00110001, mtStr(topic), data)
        pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's'
        (slen, packet) = struct.unpack(pack_format, self._in_packet['packet'])
        pack_format = '!' + str(slen) + 's' + str(len(packet)-slen) + 's'
        topic, packet = struct.unpack(pack_format, packet) 

        # message.topic = b'test'
        msg = {} 
        msg['topic'] = topic.decode('utf-8') 
        msg['payload'] =  packet 
        msg['timestamp'] = time.time() 
        self.on_message(self, self._userdata, msg) 
        return 0
开发者ID:slzatz,项目名称:miscellaneous,代码行数:19,代码来源:umqtt3.py


示例16: __init__

    def __init__(self,
                 mode=BME280_OSAMPLE_1,
                 address=BME280_I2CADDR,
                 i2c=None,
                 **kwargs):
        # Check that mode is valid.
        if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,
                        BME280_OSAMPLE_8, BME280_OSAMPLE_16]:
            raise ValueError(
                'Unexpected mode value {0}. Set mode to one of '
                'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '
                'BME280_ULTRAHIGHRES'.format(mode))
        self._mode = mode
        self.address = address
        if i2c is None:
            raise ValueError('An I2C object is required.')
        self.i2c = i2c

        # load calibration data
        dig_88_a1 = self.i2c.readfrom_mem(self.address, 0x88, 26)
        dig_e1_e7 = self.i2c.readfrom_mem(self.address, 0xE1, 7)
        self.dig_T1, self.dig_T2, self.dig_T3, self.dig_P1, \
            self.dig_P2, self.dig_P3, self.dig_P4, self.dig_P5, \
            self.dig_P6, self.dig_P7, self.dig_P8, self.dig_P9, \
            _, self.dig_H1 = unpack("<HhhHhhhhhhhhBB", dig_88_a1)

        self.dig_H2, self.dig_H3 = unpack("<hB", dig_e1_e7)
        e4_sign = unpack_from("<b", dig_e1_e7, 3)[0]
        self.dig_H4 = (e4_sign << 4) | (dig_e1_e7[4] & 0xF)

        e6_sign = unpack_from("<b", dig_e1_e7, 5)[0]
        self.dig_H5 = (e6_sign << 4) | (dig_e1_e7[4] >> 4)

        self.dig_H6 = unpack_from("<b", dig_e1_e7, 6)[0]

        self.i2c.writeto_mem(self.address, BME280_REGISTER_CONTROL,
                             bytearray([0x3F]))
        self.t_fine = 0

        # temporary data holders which stay allocated
        self._l1_barray = bytearray(1)
        self._l8_barray = bytearray(8)
        self._l3_resultarray = array("i", [0, 0, 0])
开发者ID:eztmondom,项目名称:test1,代码行数:43,代码来源:bme280.py


示例17: __init__

    def __init__(self,
                 mode=BME280_OSAMPLE_8,
                 address=BME280_I2CADDR,
                 i2c=None,
                 **kwargs):
        # Check that mode is valid.
        if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,
                        BME280_OSAMPLE_8, BME280_OSAMPLE_16]:
            raise ValueError(
                'Unexpected mode value {0}. Set mode to one of '
                'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '
                'BME280_ULTRAHIGHRES'.format(mode))
        self._mode = mode
        self.address = address
        if i2c is None:
            raise ValueError('An I2C object is required.')
        self.i2c = i2c
        self.__sealevel = 101325

        # load calibration data
        dig_88_a1 = self.i2c.readfrom_mem(self.address, 0x88, 26)
        dig_e1_e7 = self.i2c.readfrom_mem(self.address, 0xE1, 7)

        self.dig_T1, self.dig_T2, self.dig_T3, self.dig_P1, \
            self.dig_P2, self.dig_P3, self.dig_P4, self.dig_P5, \
            self.dig_P6, self.dig_P7, self.dig_P8, self.dig_P9, \
            _, self.dig_H1 = unpack("<HhhHhhhhhhhhBB", dig_88_a1)

        self.dig_H2, self.dig_H3, self.dig_H4,\
            self.dig_H5, self.dig_H6 = unpack("<hBbhb", dig_e1_e7)
        # unfold H4, H5, keeping care of a potential sign
        self.dig_H4 = (self.dig_H4 * 16) + (self.dig_H5 & 0xF)
        self.dig_H5 //= 16

        self.i2c.writeto_mem(self.address, BME280_REGISTER_CONTROL,
                             bytearray([0x3F]))
        self.t_fine = 0

        # temporary data holders which stay allocated
        self._l1_barray = bytearray(1)
        self._l8_barray = bytearray(8)
        self._l3_resultarray = array("i", [0, 0, 0])
开发者ID:Hiverize,项目名称:Sensorbeuten,代码行数:42,代码来源:bme280.py


示例18: time

def time():
    NTP_QUERY = bytearray(48)
    NTP_QUERY[0] = 0x1b
    addr = socket.getaddrinfo(host, 123)[0][-1]
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.settimeout(1)
    res = s.sendto(NTP_QUERY, addr)
    msg = s.recv(48)
    s.close()
    val = struct.unpack("!I", msg[40:44])[0]
    return val - NTP_DELTA
开发者ID:19emtuck,项目名称:micropython,代码行数:11,代码来源:ntptime.py


示例19: ntp

def ntp(): # noqa
    ntp_query = bytearray(48)
    ntp_query[0] = 0x1b
    addr = socket.getaddrinfo('pool.ntp.org', 123)[0][-1]
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.settimeout(1)
    s.sendto(ntp_query, addr)
    msg = s.recv(48)
    s.close()
    val = struct.unpack("!I", msg[40:44])[0]
    return val - NTP_DELTA
开发者ID:chassing,项目名称:pyq2,代码行数:11,代码来源:untplib.py


示例20: decode_payload

    def decode_payload(payload):
        """
        Decode message payload
        :param payload: byte stream representing the message payload
        :return: an array of 8 float [stab_Kp,stab_Kd,stab_Ki,Max Increment,gyro_Kp,gyro_Kd,gyro_Ki,gyro_Max Increment]
        """
        pid_settings = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]

        for i in range(0, 8):
            pid_settings[i] = ustruct.unpack('>f', payload[i*4:i*4 + 4])[0]

        return pid_settings
开发者ID:Sokrates80,项目名称:air-py,代码行数:12,代码来源:ap_save_pid_settings.py



注:本文中的ustruct.unpack函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utGenerator.run_test函数代码示例发布时间:2022-05-27
下一篇:
Python ustruct.pack_into函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap