mirror of
https://gitea.osmocom.org/sim-card/pysim.git
synced 2026-03-17 02:48:34 +03:00
Previous implementation waits 300ms for response after each command issued. But many commands finish earlier. This patch improves the command execution time by frequently checking for the response to complete (i.e. ends with OK or ERROR), or the occurence of a timeout (default 200ms). Timeout can be adapted per command to support long response times of certain commands like AT+COPS=? (network search) Execution time benchmark (20 AT commands/responses): Previous: 6.010s (100.0%) New code: 0.045s ( 0.7%) Change-Id: I69b1cbc0a20d54791e5800bf27ebafc2c8606d93
158 lines
4.6 KiB
Python
158 lines
4.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (C) 2020 Vadim Yanitskiy <axilirator@gmail.com>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
import logging as log
|
|
import serial
|
|
import time
|
|
import re
|
|
|
|
from pySim.transport import LinkBase
|
|
from pySim.exceptions import *
|
|
|
|
# HACK: if somebody needs to debug this thing
|
|
# log.root.setLevel(log.DEBUG)
|
|
|
|
class ModemATCommandLink(LinkBase):
|
|
"""Transport Link for 3GPP TS 27.007 compliant modems."""
|
|
def __init__(self, device:str='/dev/ttyUSB0', baudrate:int=115200, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self._sl = serial.Serial(device, baudrate, timeout=5)
|
|
self._echo = False # this will be auto-detected by _check_echo()
|
|
self._device = device
|
|
self._atr = None
|
|
|
|
# Check the AT interface
|
|
self._check_echo()
|
|
|
|
# Trigger initial reset
|
|
self.reset_card()
|
|
|
|
def __del__(self):
|
|
self._sl.close()
|
|
|
|
def send_at_cmd(self, cmd, timeout=0.2, patience=0.002):
|
|
# Convert from string to bytes, if needed
|
|
bcmd = cmd if type(cmd) is bytes else cmd.encode()
|
|
bcmd += b'\r'
|
|
|
|
# Clean input buffer from previous/unexpected data
|
|
self._sl.reset_input_buffer()
|
|
|
|
# Send command to the modem
|
|
log.debug('Sending AT command: %s', cmd)
|
|
try:
|
|
wlen = self._sl.write(bcmd)
|
|
assert(wlen == len(bcmd))
|
|
except:
|
|
raise ReaderError('Failed to send AT command: %s' % cmd)
|
|
|
|
rsp = b''
|
|
its = 1
|
|
t_start = time.time()
|
|
while True:
|
|
rsp = rsp + self._sl.read(self._sl.in_waiting)
|
|
if rsp.endswith(b'OK\r\n'):
|
|
log.debug('Command finished with result: OK')
|
|
break
|
|
if rsp.endswith(b'ERROR\r\n'):
|
|
log.debug('Command finished with result: ERROR')
|
|
break
|
|
if time.time() - t_start >= timeout:
|
|
log.debug('Command finished with timeout >= %ss', timeout)
|
|
break
|
|
time.sleep(patience)
|
|
its += 1
|
|
log.debug('Command took %0.6fs (%d cycles a %fs)', time.time() - t_start, its, patience)
|
|
|
|
if self._echo:
|
|
# Skip echo chars
|
|
rsp = rsp[wlen:]
|
|
rsp = rsp.strip()
|
|
rsp = rsp.split(b'\r\n\r\n')
|
|
|
|
log.debug('Got response from modem: %s', rsp)
|
|
return rsp
|
|
|
|
def _check_echo(self):
|
|
"""Verify the correct response to 'AT' command
|
|
and detect if inputs are echoed by the device
|
|
|
|
Although echo of inputs can be enabled/disabled via
|
|
ATE1/ATE0, respectively, we rather detect the current
|
|
configuration of the modem without any change.
|
|
"""
|
|
# Next command shall not strip the echo from the response
|
|
self._echo = False
|
|
result = self.send_at_cmd('AT')
|
|
|
|
# Verify the response
|
|
if len(result) > 0:
|
|
if result[-1] == b'OK':
|
|
self._echo = False
|
|
return
|
|
elif result[-1] == b'AT\r\r\nOK':
|
|
self._echo = True
|
|
return
|
|
raise ReaderError('Interface \'%s\' does not respond to \'AT\' command' % self._device)
|
|
|
|
def reset_card(self):
|
|
# Reset the modem, just to be sure
|
|
if self.send_at_cmd('ATZ') != [b'OK']:
|
|
raise ReaderError('Failed to reset the modem')
|
|
|
|
# Make sure that generic SIM access is supported
|
|
if self.send_at_cmd('AT+CSIM=?') != [b'OK']:
|
|
raise ReaderError('The modem does not seem to support SIM access')
|
|
|
|
log.info('Modem at \'%s\' is ready!' % self._device)
|
|
|
|
def connect(self):
|
|
pass # Nothing to do really ...
|
|
|
|
def disconnect(self):
|
|
pass # Nothing to do really ...
|
|
|
|
def wait_for_card(self, timeout=None, newcardonly=False):
|
|
pass # Nothing to do really ...
|
|
|
|
def _send_apdu_raw(self, pdu):
|
|
# Make sure pdu has upper case hex digits [A-F]
|
|
pdu = pdu.upper()
|
|
|
|
# Prepare the command as described in 8.17
|
|
cmd = 'AT+CSIM=%d,\"%s\"' % (len(pdu), pdu)
|
|
|
|
# Send AT+CSIM command to the modem
|
|
# TODO: also handle +CME ERROR: <err>
|
|
rsp = self.send_at_cmd(cmd)
|
|
if len(rsp) != 2 or rsp[-1] != b'OK':
|
|
raise ReaderError('APDU transfer failed: %s' % str(rsp))
|
|
rsp = rsp[0] # Get rid of b'OK'
|
|
|
|
# Make sure that the response has format: b'+CSIM: %d,\"%s\"'
|
|
try:
|
|
result = re.match(b'\+CSIM: (\d+),\"([0-9A-F]+)\"', rsp)
|
|
(rsp_pdu_len, rsp_pdu) = result.groups()
|
|
except:
|
|
raise ReaderError('Failed to parse response from modem: %s' % rsp)
|
|
|
|
# TODO: make sure we have at least SW
|
|
data = rsp_pdu[:-4].decode()
|
|
sw = rsp_pdu[-4:].decode()
|
|
return data, sw
|