Merge pull request #512 from CoderFX/fix/pisugarx-i2c-robustness

fix(pisugarx): improve I2C thread safety, error recovery, and write protection
This commit is contained in:
Jayofelony
2026-03-11 07:18:42 +01:00
committed by GitHub

View File

@@ -66,6 +66,8 @@ class PiSugarServer:
self.lowpower_shutdown_level = 10
self.max_charge_voltage_protection = False
self.max_protection_level=80
self._lock = threading.Lock()
self._error_count = 0
# Start the device connection in a background thread
self.connection_thread = threading.Thread(
target=self._connect_device, daemon=True)
@@ -114,18 +116,16 @@ class PiSugarServer:
if self.model == 'PiSugar2' or self.model == 'PiSugar2Plus':
self.set_battery_notallow_charging() # Temporarily disable charging to get accurate battery voltage
time.sleep(0.05)
self.i2creg = []
new_regs = []
for i in range(0, 256, 32):
# Calculate the starting register address for the current read
current_register = 0 + i
# Calculate the length of the current read
current_length = min(32, 256 - i)
# Read data block
chunk = self._bus.read_i2c_block_data(
self.address, current_register, current_length)
# Add the read data block to the result list
self.i2creg.extend(chunk)
new_regs.extend(chunk)
time.sleep(0.1)
with self._lock:
self.i2creg = new_regs
logging.debug(f"Data length: {len(self.i2creg)}")
logging.debug(f"Data: {self.i2creg}")
if self.model == 'PiSugar3':
@@ -136,20 +136,21 @@ class PiSugarServer:
ctr1 = self.i2creg[0x02] # Read control register 1
self.power_plugged = (ctr1 & (1 << 7)) != 0 # Check if power is plugged in
self.allow_charging = (ctr1 & (1 << 6)) != 0 # Check if charging is allowed
if self.max_charge_voltage_protection:
try:
self._bus.write_byte_data(
self.address, 0x0B, 0x29) # Disable write protection
self._bus.write_byte_data(self.address, 0x20, self._bus.read_byte_data(
self.address, 0x20) | 0b10000000)
self._bus.write_byte_data(
self.address, 0x0B, 0x00) # Enable write protection
else:
self._bus.write_byte_data(
self.address, 0x0B, 0x29) # Disable write protection
self._bus.write_byte_data(self.address, 0x20, self._bus.read_byte_data(
self.address, 0x20) & 0b01111111)
self._bus.write_byte_data(
self.address, 0x0B, 0x00) # Enable write protection
if self.max_charge_voltage_protection:
self._bus.write_byte_data(self.address, 0x20, self._bus.read_byte_data(
self.address, 0x20) | 0b10000000)
else:
self._bus.write_byte_data(self.address, 0x20, self._bus.read_byte_data(
self.address, 0x20) & 0b01111111)
finally:
try:
self._bus.write_byte_data(
self.address, 0x0B, 0x00) # Re-enable write protection
except Exception:
pass
elif self.model == 'PiSugar2':
high = self.i2creg[0xa3]
low = self.i2creg[0xa2]
@@ -194,20 +195,43 @@ class PiSugarServer:
self.shutdown()
pwnagotchi.shutdown()
time.sleep(3)
self._error_count = 0
except Exception as e:
logging.error(f"read error{e}")
self._error_count += 1
if self._error_count <= 3:
logging.error(f"[PiSugarX] I2C read error ({self._error_count}): {e}")
elif self._error_count == 4:
logging.error("[PiSugarX] I2C errors repeating, suppressing further logs")
if self._error_count >= 5:
self.ready = False
try:
self._bus.close()
time.sleep(1)
self._bus = smbus.SMBus(1)
self._error_count = 0
logging.info("[PiSugarX] I2C bus reconnected")
except Exception as bus_err:
logging.error(f"[PiSugarX] I2C bus reconnect failed: {bus_err}")
backoff = min(3 * (2 ** max(0, self._error_count - 2)), 30)
time.sleep(backoff)
continue
time.sleep(3)
def shutdown(self):
# logging.info("[PiSugarX] PiSugar set shutdown .")
if self.model == 'PiSugar3':
# Shutdown the power after 10 seconds
self._bus.write_byte_data(self.address, 0x0B, 0x29) # Disable write protection
self._bus.write_byte_data(self.address, 0x09, 10)
self._bus.write_byte_data(self.address, 0x02, self._bus.read_byte_data(
self.address, 0x02) & 0b11011111)
self._bus.write_byte_data(self.address, 0x0B, 0x00) # Enable write protection
logging.info("[PiSugarX] PiSugar shutdown in 10s.")
try:
self._bus.write_byte_data(self.address, 0x0B, 0x29) # Disable write protection
self._bus.write_byte_data(self.address, 0x09, 10)
self._bus.write_byte_data(self.address, 0x02, self._bus.read_byte_data(
self.address, 0x02) & 0b11011111)
logging.info("[PiSugarX] PiSugar3 shutdown in 10s.")
except Exception as e:
logging.error(f"[PiSugarX] Failed to send shutdown to PiSugar3: {e}")
finally:
try:
self._bus.write_byte_data(self.address, 0x0B, 0x00) # Re-enable write protection
except Exception:
pass
elif self.model == 'PiSugar2':
pass
elif self.model == 'PiSugar2Plus':