diff --git a/pySim/transport/modem_atcmd.py b/pySim/transport/modem_atcmd.py index eef38cb3..43614f1a 100644 --- a/pySim/transport/modem_atcmd.py +++ b/pySim/transport/modem_atcmd.py @@ -32,53 +32,85 @@ class ModemATCommandLink(LinkBase): def __init__(self, device:str='/dev/ttyUSB0', baudrate:int=115200, **kwargs): super().__init__(**kwargs) self._sl = serial.Serial(device, baudrate, timeout=5) + self._echo = False # this will be auto-detected by _check_echo() self._device = device self._atr = None + # Check the AT interface + self._check_echo() + # Trigger initial reset self.reset_card() def __del__(self): self._sl.close() - def send_at_cmd(self, cmd): + def send_at_cmd(self, cmd, timeout=0.2, patience=0.002): # Convert from string to bytes, if needed bcmd = cmd if type(cmd) is bytes else cmd.encode() bcmd += b'\r' + # Clean input buffer from previous/unexpected data + self._sl.reset_input_buffer() + # Send command to the modem - log.debug('Sending AT command: %s' % cmd) + log.debug('Sending AT command: %s', cmd) try: wlen = self._sl.write(bcmd) assert(wlen == len(bcmd)) except: raise ReaderError('Failed to send AT command: %s' % cmd) - # Give the modem some time... - time.sleep(0.3) + rsp = b'' + its = 1 + t_start = time.time() + while True: + rsp = rsp + self._sl.read(self._sl.in_waiting) + if rsp.endswith(b'OK\r\n'): + log.debug('Command finished with result: OK') + break + if rsp.endswith(b'ERROR\r\n'): + log.debug('Command finished with result: ERROR') + break + if time.time() - t_start >= timeout: + log.debug('Command finished with timeout >= %ss', timeout) + break + time.sleep(patience) + its += 1 + log.debug('Command took %0.6fs (%d cycles a %fs)', time.time() - t_start, its, patience) - # Read the response - try: - # Skip characters sent back - self._sl.read(wlen) - # Read the rest - rsp = self._sl.read_all() + if self._echo: + # Skip echo chars + rsp = rsp[wlen:] + rsp = rsp.strip() + rsp = rsp.split(b'\r\n\r\n') - # Strip '\r\n' - rsp = rsp.strip() - # Split into a list - rsp = rsp.split(b'\r\n\r\n') - except: - raise ReaderError('Failed parse response to AT command: %s' % cmd) - - log.debug('Got response from modem: %s' % rsp) + log.debug('Got response from modem: %s', rsp) return rsp - def reset_card(self): - # Make sure that we can talk to the modem - if self.send_at_cmd('AT') != [b'OK']: - raise ReaderError('Failed to connect to modem') + def _check_echo(self): + """Verify the correct response to 'AT' command + and detect if inputs are echoed by the device + Although echo of inputs can be enabled/disabled via + ATE1/ATE0, respectively, we rather detect the current + configuration of the modem without any change. + """ + # Next command shall not strip the echo from the response + self._echo = False + result = self.send_at_cmd('AT') + + # Verify the response + if len(result) > 0: + if result[-1] == b'OK': + self._echo = False + return + elif result[-1] == b'AT\r\r\nOK': + self._echo = True + return + raise ReaderError('Interface \'%s\' does not respond to \'AT\' command' % self._device) + + def reset_card(self): # Reset the modem, just to be sure if self.send_at_cmd('ATZ') != [b'OK']: raise ReaderError('Failed to reset the modem')