Les principales nouveautés sont
- Un système de hook a été intégré afin de pouvoir customiser le fonctionnement. Cela peut être utilisé par exemple pour simuler une réponse en erreur. Je posterai bientôt un exemple pour illustrer le principe des hooks.
- Modbus Test Kit propose à présent un simulateur prêt à l'emploi qui peut servir de base à d'autres serveurs comme dans l'exemple ci-dessous
- Le simulateur intègre une API simple permettant de le contrôler depuis un autre programme. L'idée est de faciliter l'utilisation de Modbus Test Kit dans un jeu de tests unitaires ce qui est l'idée première de cet outil.
Le code ci-dessous est un serveur modbus qui retourne la charge CPU de la machine Windows qui l'exécute. Il est basé sur le simulateur de Modbus Test Kit. Cela montre comment créer en quelques lignes un simulateur qui vous rendra peut-être de bons services. Voici le code.
from modbus_tk.simulator import *
from modbus_tk.simulator_rpc_client import SimulatorRpcClient
from modbus_tk.utils import WorkerThread
from modbus_tk.defines import *
from modbus_tk.modbus_tcp import TcpServer
import time
class SystemDataCollector:
"""The class in charge of getting the CPU load"""
def __init__(self, refresh_rate_in_sec):
"""Constructor"""
self._simu = SimulatorRpcClient()
self._max_count = refresh_rate_in_sec * 10
self._count = self._max_count-1
def collect(self):
"""get the CPU load thanks to WMI"""
try:
self._count += 1
if self._count >= self._max_count:
self._count = 0
#WMI get the load percentage of the machine
from win32com.client import GetObject
wmi = GetObject('winmgmts:')
cpu = wmi.InstancesOf('Win32_Processor')
for (_cpu, i) in zip(cpu, xrange(10)):
value = _cpu.Properties_('LoadPercentage').Value
cpu_usage = int(str(value)) if value else 0
#execute a RPC command for changing the value
self._simu.set_values(1, "Cpu", i, (cpu_usage, ))
except Exception, excpt:
LOGGER.debug("SystemDataCollector error: %s", str(excpt))
time.sleep(0.1)
if __name__ == "__main__":
#create the object for getting CPU data
data_collector = SystemDataCollector(5)
#create the thread in charge of calling the data collector
system_monitor = WorkerThread(data_collector.collect)
#create the modbus TCP simulator and one slave
#and one block of analog inputs
simu = Simulator(TcpServer())
slave = simu.server.add_slave(1)
slave.add_block("Cpu", ANALOG_INPUTS, 0, 10)
try:
LOGGER.info("'quit' for closing the server")
#start the data collect
system_monitor.start()
#start the simulator! will block until quit command is received
simu.start()
except Exception, excpt:
print excpt
finally:
#close the simulator
simu.close()
#stop the data collect
system_monitor.stop()