【问题标题】:Find string between two substrings, in a stream of data在数据流中查找两个子字符串之间的字符串
【发布时间】:2021-07-30 19:36:50
【问题描述】:

我有这个连续的串行数据流:

----------------------------------------
 
SENSOR COORDINATE         = 0
 
MEASURED RESISTANCE       = 3.70 kOhm
 
----------------------------------------
 
----------------------------------------
 
SENSOR COORDINATE         = 1
 
MEASURED RESISTANCE       = 3.70 kOhm
 
----------------------------------------
 
----------------------------------------
 
SENSOR COORDINATE         = 2
 
MEASURED RESISTANCE       = 3.69 kOhm
 
----------------------------------------

对于每次迭代,我希望能够获取值。传感器坐标值,电阻值。

我找到了使用 .split() 和使用正则表达式 ( Find string between two substrings),但问题是在我的情况下,我想要过滤的不是一个字符串,而是一个连续的流。

例如,.split() 会找到我的字符串,但它会将流分成两半。这在连续流中不止一次有效。

注意:在传感器坐标值之后,我有一个回车符。

编辑 1/3: 这是抓取串口数据的sn-p代码:

def readSerial():
    global after_id
    while ser.in_waiting:
        try:
            ser_bytes = ser.readline() #read data from the serial line
            ser_bytes = ser_bytes.decode("utf-8")
            text.insert("end", ser_bytes)
        except UnicodeDecodeError:
            print("UnicodeDecodeError")
    else:
        print("No data received")
    after_id=root.after(50,readSerial)

如果有人想知道,这是 arduino 端的 C 代码,用于发送数据:

Serial.println("----------------------------------------");
Serial.print("SENSOR COORDINATE         = ");
Serial.println(sensor_coord);
Serial.print("MEASURED RESISTANCE       = ");
double resistanse = ((period * GAIN_VALUE * 1000) / (4 * CAPACITOR_VALUE)) - R_BIAS_VALUE;
Serial.print(resistanse);
Serial.println(" kOhm");

编辑 2/3: 这是以前的方法:

def readSerial():
        global after_id
        while ser.in_waiting:
            try:
                ser_bytes = ser.readline() #read data from the serial line
                ser_bytes = ser_bytes.decode("utf-8")
                text.insert("end", ser_bytes)
                result = re.search.(, ser_bytes)
                print(result)
            except UnicodeDecodeError:
                print("UnicodeDecodeError")
        else:
            print("No data received")
        after_id=root.after(50,readSerial)

在另一次尝试中,我将这一行 result = re.search.(, ser_bytes) 更改为 result =ser_bytes.split("TE = ")

这是我收到的数据的图片(这是一个 tkinter 文本框架)。

编辑 3/3: 这是我实现 dracarys 算法的代码:

def readSerial():
    global after_id
    while ser.in_waiting:
        try:
            ser_bytes = ser.readline() 
            print(ser_bytes)
            ser_bytes = ser_bytes.decode("utf-8")
            print(ser_bytes)
            text.insert("end", ser_bytes)
           
            if "SENSOR COORDINATE" in ser_bytes:
               found_coordinate = True
               coordinate = int(ser_bytes.split("=")[1].strip())
               print("Coordinate",coordinate)
            if "MEASURED RESISTANCE" in ser_bytes and found_coordinate:
               found_coordinate = False
               resistance = float(ser_bytes.split("=")[1].split("kOhm")[0].strip())
               print("Resistance",resistance)
        
        except UnicodeDecodeError:
            print("UnicodeDecodeError")
    else:
        print("No data received")
    after_id=root.after(50,readSerial)

这是我得到的错误,在代码成功运行大约十秒钟后(我也包含了正常操作输出以供参考):

No data received
b'SENSOR COORDINATE         = 2\r\n'
SENSOR COORDINATE         = 2

Coordinate 2
b'MEASURED RESISTANCE       = 3.67 kOhm\r\n'
MEASURED RESISTANCE       = 3.67 kOhm

Resistance 3.67
b'----------------------------------------\r\n'
----------------------------------------

b'----------------------------------------\r\n'
----------------------------------------

b'SENSOR COORDINATE         = 3\r\n'
SENSOR COORDINATE         = 3

Coordinate 3
No data received
b'MEASURED RESISTANCE       = 3.78 kOhm\r\n'
MEASURED RESISTANCE       = 3.78 kOhm

Exception in Tkinter callback
Traceback (most recent call last):
  File "C:\Users\User1\AppData\Local\Programs\Python\Python38-32\lib\tkinter\__i
nit__.py", line 1883, in __call__
    return self.func(*args)
  File "C:\Users\User1\AppData\Local\Programs\Python\Python38-32\lib\tkinter\__i
nit__.py", line 804, in callit
    func(*args)
  File "tkinterWithPortsExperiment.py", line 73, in readSerial
    if "MEASURED RESISTANCE" in ser_bytes and found_coordinate:
UnboundLocalError: local variable 'found_coordinate' referenced before assignment

【问题讨论】:

  • 你能分享你尝试过的东西吗?有了这些有限的信息,我建议实现某种环形缓冲区(例如,使用双端队列)并对缓冲区内容执行处理。
  • 您的流是如何进入您的程序的?读取日志文件? stdin?插座?
  • “伙计们,解决这个问题对我来说非常重要。” 那么你应该! Stack Overflow 不是代码编写服务。你没有表现出自己解决问题的努力——你所做的只是将每一行插入一个文本框中。 “为我实现此功能”与此站点无关。你必须诚实地尝试,然后就你的算法或技术提出一个具体问题。请使用tour,阅读what's on-topic hereHow to Askquestion checklist,并提供minimal reproducible example
  • 一个提示让您开始:您根本不需要“拆分”。由于您一次得到的只是一行,因此解析每一行并解释结果。如果您找到了传感器坐标,请留意提供测量距离的直线。两者兼得后,记录它们并留意新的传感器坐标。
  • 如果你也是 arduino 方面的维护者,最好为每个传感器事件编写一行 JSON,然后可以轻松地逐行处理。

标签: python python-3.x regex string split


【解决方案1】:

正如我在 cmets 中所说,我觉得应该简化 Arduino 输出。正如@oliver_t 所说,每个传感器事件的一行 JSON 将是完美的。

如果你不能这样做,这里是解析这个的代码。

由于我没有任何方式逐行接收您的串行监视器输出,因此我通过将输出存储在 txt 文件中然后逐行读取来进行模拟。我希望这会有所帮助,因为您的问题是如何解析输入。

f = open('stream.txt', 'r')
global found_coordinate
found_coordinate = False
while True:
    line = f.readline()
    if not line:
        break
    
    if "SENSOR COORDINATE" in line:
        found_coordinate = True
        coordinate = int(line.split("=")[1].strip())
        print("Coordinate",coordinate)
    
    if "MEASURED RESISTANCE" in line and found_coordinate:
        found_coordinate = False
        resistance = float(line.split("=")[1].split("kOhm")[0].strip())
        print("Resistance",resistance)

我希望这会有所帮助,如果我对您的要求的理解有任何差异,请告诉我,以便我修复我的代码。

注意:您实际上可能不需要 .strip(),因为类型转换为 intfloat 可以解决这个问题,但是我仍然将它放在那里作为健全性检查

【讨论】:

  • 谢谢!有效!但是过了一会儿我得到了这个错误:`如果ser_bytes和found_coordinate中的“MEASURED RESISTANCE”:UnboundLocalError:在分配之前引用局部变量'found_coordinate'`
  • 它运行了几秒钟,然后它给了我这个错误!
  • 我会调查一下,你能给我完整的回溯吗?或者向它展示你是如何将这个逻辑集成到你的代码中的?
  • 是的,我会在几个小时内完成。
  • @user1584421 我添加了 found_coordinate 变量只是为了确保它只在找到坐标值后才检测到电阻值。但如果你不需要它,你可以删除它(我可以用它更新我的代码)。但我想看看更多细节,以便给你更明智的建议
【解决方案2】:

如果您可以更改您的 Arduino 代码,那么您也许可以利用 python 中的 json.load() 方法将字符串转换为更易于管理的内容。

我不喜欢 Arduinos(尽管有一个坐着伸手可及,盒子未打开,两年的大部分时间......)所以下面的代码可能更接近于伪代码而不是实际代码:

# This should (fingers crossed) build and send a separate message for each variable.
# It could relatively easily be combined into one message.

double resistanse = ((period * GAIN_VALUE * 1000) / (4 * CAPACITOR_VALUE)) - R_BIAS_VALUE;

##########################################################
# If you want to send a separate message for each variable
String sSensor = "{\"sensorcoord\":";
String sResistance = "{\"resistance\":";
String sEnd = "}"

String sensor_output = sSensor + sensor_coord + sEnd
Serial.println(sensor_output)
# output will be {"sensorcoord":1}

String resistance_output = sResistance + resistanse + sEnd
Serial.println(resistance_output)
# output will be {"resistance":3.7}

########################################################
# If you want to send one message holding both variables

String sSensor = "{\"sensorcoord\":";
String sResistance = ",\"resistance\":";
String sEnd = "}"

String combined_output = sSensor + sensor_coord + sResistance + resistance + sEnd
Serial.println(combined_output)
# output will be {"sensorcoord": 1,"resistance":3.7}

一旦将字符串输入 Python,您就可以使用 json.loads() 获取(格式正确的)文本字符串并将其转换为您可以更轻松地访问的对象:

import json

data = json.loads(textstringFromArduino)

# If you sent each value separately, you now need to work out which value you are receiving.
for key, value in data.items():
    if key == "sensorcoord":
      print(value)
    elif key == "resistance":
      print(value)


# If you sent both values in one message, it's a lot easier...

print(data['sensorcoord'])
print(data['resistance'])


【讨论】:

    【解决方案3】:

    这是你的readSerial 函数:

    def readSerial():
        global after_id
        while ser.in_waiting:
            try:
                ser_bytes = ser.readline() 
                print(ser_bytes)
                ser_bytes = ser_bytes.decode("utf-8")
                print(ser_bytes)
                text.insert("end", ser_bytes)
               
                if "SENSOR COORDINATE" in ser_bytes:
                   found_coordinate = True
                   coordinate = int(ser_bytes.split("=")[1].strip())
                   print("Coordinate",coordinate)
                if "MEASURED RESISTANCE" in ser_bytes and found_coordinate:
                   found_coordinate = False
                   resistance = float(ser_bytes.split("=")[1].split("kOhm")[0].strip())
                   print("Resistance",resistance)
            
            except UnicodeDecodeError:
                print("UnicodeDecodeError")
        else:
            print("No data received")
        after_id=root.after(50,readSerial)
    

    如您所知,您在两个 if 语句中的每一个中都定义了 found_coordinate。但是让我们看看你在哪里引用了found_coordinate 变量:

                if "MEASURED RESISTANCE" in ser_bytes and found_coordinate:
    

    这是您使用found_coordinate 变量的唯一地方,也是发生错误的地方。现在考虑一下,如果

                if "SENSOR COORDINATE" in ser_bytes:
    

    从未评估为True,然后found_coordinate = True 行从未遇到,这意味着found_coordinate 从未被定义。是的,还有另外一行是你定义的,但是只能在这个条件下执行:

                if "MEASURED RESISTANCE" in ser_bytes and found_coordinate:
    

    同样,found_coordinate 变量尚未定义,导致错误。您可能想知道:它是如何成功运行 10 秒而没有出现错误的?很简单:

    在 10 秒内,if "SENSOR COORDINATE" in ser_bytes: 全部评估为 False,因此 found_coordinate 变量从未被定义。但同时,10 秒内,"MEASURED RESISTANCE" in ser_bytes 也全部评估为False,所以程序没有继续 and found_coordinate,因为没有必要。错误发生时"MEASURED RESISTANCE" in ser_bytes 评估为True,使程序解析 and found_coordinate,其中found_coordinate 尚未定义。

    【讨论】:

      【解决方案4】:

      你的尝试几乎就在那里。 UnboundLocalError 发生是因为如果该线是阻力线,则变量 found_coordinate 未在您的函数中定义。您也应该将其定义为全局变量,因为您需要在多个函数调用中跟踪它。我很感兴趣,第一组坐标/阻力有效。也一样

      global after_id, found_coordinate
      

      在函数的开头。


      在您发布您的尝试之前,我写了这个答案。该方法与您的方法非常相似。从中使用你觉得有用的东西!

      你根本不需要split()。由于您一次得到的只是一行,因此解析每一行并解释结果。如果您找到了传感器坐标,请留意提供测量距离的直线。两者兼得后,记录它们并留意新的传感器坐标。

      1. 接收线
      2. 行中是否包含"SENSOR COORDINATE"
        • 是:解析号码并保存以备后用。
        • 不:什么都不做? 或者打印错误信息?
      3. 行中是否包含"MEASURED RESISTANCE"
        • 是:解析号码并保存。
          • 现在我们应该拥有我们配对的两个项目。保存它们以备后用。
        • 不:什么都不做? 或者打印错误信息?

      首先,让我们定义一个函数,只提取每行的数字:

      import re
      
      def get_numbers_from_line(line):
          try:
              numbers_rex = r"(\d+(?:\.\d+)*)"
              matched_text = re.findall(numbers_rex, line)[0]
              parsed_number = float(matched_text)
              return parsed_number
          except IndexError:
              print(f"No numbers found on line {line}")
          except ValueError:
              print(f"Couldn't parse number {matched_text}")
          
          # Only come here if error occurred. Not required, but for clarity we return None
          return None
      

      接下来,让我们定义一个函数来解析每一行并决定如何处理它。它将使用几个全局变量来跟踪其状态:

      all_pairs = []
      parsed_pair = []
      def parse_line(line):
          global all_pairs, parsed_pair
          if "SENSOR COORDINATE" in line:
              sensor_coord = get_numbers_from_line(line)
              
              if parsed_pair:
                  # Something already exists in parsed_pair. Tell user we are discarding it
                  print("Data already existed in parsed_pair when a new SENSOR COORDINATE was received")
                  print("Existing data was discarded")
                  print(f"parsed_pair: {parsed_pair}")
              
              if sensor_coord is not None:
                  # Make a new list containing only this newly parsed number
                  parsed_pair = [sensor_coord]
          
          elif "MEASURED RESISTANCE" in line:
              resistance = get_numbers_from_line(line)
              if not parsed_pair:
                  # parsed_pair is empty, so no sensor coordinate was recorded. 
                  # Ignore this line and wait for the start of the next pair of data
                  print("Received measured resistance without corresponding sensor coordinate")
                  print("Received data was discarded")
                  print(f"Received data: {resistance}")
              elif resistance is not None:
                  parsed_pair.append(resistance) # Add resistance to the pair
                  all_pairs.append(parsed_pair)  # Add the pair to all_pairs 
                  parsed_pair = []  # Make a new empty list for the next pair
                  print(f"Added pair {parsed_pair}")
      

      然后,在def readSerial():text.insert("end", ser_bytes)之后,调用这个函数。

      def readSerial():
          global after_id
          while ser.in_waiting:
              try:
                  ser_bytes = ser.readline() #read data from the serial line
                  ser_bytes = ser_bytes.decode("utf-8")
                  text.insert("end", ser_bytes)
                  parse_line(ser_bytes)
              except UnicodeDecodeError:
                  print("UnicodeDecodeError")
          else:
              print("No data received")
          after_id=root.after(50,readSerial)
      

      【讨论】:

      • 非常感谢!是的,我把它变成了一个全局变量并且它起作用了!在您的共同努力和用户的共同努力下,我能够解决这个问题!既然你们都帮了我,有没有办法在你们俩之间分分?每人一半?我真的不知道怎么分,也不想拖累任何人。
      • 你知道我可以用赏金做什么吗?大部分帮助来自用户 dracarys,但您帮助我使变量成为全局变量,并且您的帖子很棒。
      • 我不确定您是否可以拆分赏金。如果你不能,你应该把它奖励给 dracarys,因为他们的帖子对你帮助最大。
      • 非常抱歉,但没有办法分割赏金。我要亲自感谢您抽出时间来帮助我。非常感谢先生!祝你一切顺利!
      • 不用担心,很高兴为您提供帮助!不过,您似乎给出了错误的答案:)
      猜你喜欢
      • 2014-12-07
      • 2011-03-23
      • 2020-05-28
      • 2013-09-13
      • 1970-01-01
      相关资源
      最近更新 更多