-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlogger.py
More file actions
111 lines (100 loc) · 3.1 KB
/
logger.py
File metadata and controls
111 lines (100 loc) · 3.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import csv
import time
import os
import subprocess
import termios
from datetime import datetime
from threading import Thread, Lock
def setup_serial(port):
fd = os.open(port, os.O_RDWR | os.O_NOCTTY)
attrs = termios.tcgetattr(fd)
attrs[0] = 0
attrs[1] = 0
attrs[2] = termios.CS8 | termios.CREAD | termios.CLOCAL
attrs[3] = 0
attrs[4] = termios.B115200
attrs[5] = termios.B115200
termios.tcsetattr(fd, termios.TCSANOW, attrs)
os.set_blocking(fd, True)
return fd
def read_line(fd):
line = b""
while True:
ch = os.read(fd, 1)
if ch == b"\n":
return line.decode("utf-8", errors="ignore").strip()
line += ch
def find_usb():
try:
result = subprocess.run(['lsblk', '-o', 'NAME,MOUNTPOINT', '-rn'], capture_output=True, text=True)
for line in result.stdout.splitlines():
parts = line.split()
if len(parts) == 2:
mountpoint = parts[1]
if mountpoint.startswith('/media') or mountpoint.startswith('/mnt'):
return mountpoint
except:
pass
return None
def find_picos():
ports = []
for dev in sorted(os.listdir('/dev')):
if dev.startswith('ttyACM') or dev.startswith('ttyUSB'):
ports.append(f'/dev/{dev}')
return ports
latest = {"pico1": None, "pico2": None}
lock = Lock()
def read_pico(port, name):
while True:
try:
fd = setup_serial(port)
print(f"Connected to {name} on {port}")
while True:
line = read_line(fd)
if "ADC:" in line:
try:
value = line.split("ADC:")[1].split("Counter:")[0].strip()
with lock:
latest[name] = value
except:
pass
except Exception as e:
print(f"{name} error: {e}, retrying...")
with lock:
latest[name] = None
time.sleep(2)
# Wait for USB
print("Waiting for USB drive...")
usb_path = None
while usb_path is None:
usb_path = find_usb()
time.sleep(1)
CSV_PATH = os.path.join(usb_path, "ride_height_log.csv")
print(f"Saving to {CSV_PATH}")
# Wait for at least 1 Pico
print("Waiting for Picos...")
while True:
ports = find_picos()
if len(ports) >= 1:
break
time.sleep(1)
print(f"Found: {ports}")
Thread(target=read_pico, args=(ports[0], "pico1"), daemon=True).start()
if len(ports) >= 2:
Thread(target=read_pico, args=(ports[1], "pico2"), daemon=True).start()
time.sleep(2)
# Log at 100hz
file_exists = os.path.exists(CSV_PATH)
with open(CSV_PATH, "a", newline="") as csvfile:
writer = csv.writer(csvfile)
if not file_exists:
writer.writerow(["timestamp", "pico1_value", "pico2_value"])
print("Logging started!")
while True:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
with lock:
v1 = latest["pico1"] or ""
v2 = latest["pico2"] or ""
writer.writerow([timestamp, v1, v2])
csvfile.flush()
time.sleep(0.01)