Magazine

Réaliser un serveur Modbus avec Python

Publié le 25 septembre 2009 par Luc
Je viens de mettre à jour la version 0.3 de Modbus Test Kit en ligne.

Les principales nouveautés sont
  • Un système de hook a été intégré afin de pouvoir customiser le fonctionnement. Cela peut être utilisé par exemple pour simuler une réponse en erreur. Je posterai bientôt un exemple pour illustrer le principe des hooks.
  • Modbus Test Kit propose à présent un simulateur prêt à l'emploi qui peut servir de base à d'autres serveurs comme dans l'exemple ci-dessous
  • Le simulateur intègre une API simple permettant de le contrôler depuis un autre programme. L'idée est de faciliter l'utilisation de Modbus Test Kit dans un jeu de tests unitaires ce qui est l'idée première de cet outil.


Le code ci-dessous est un serveur modbus qui retourne la charge CPU de la machine Windows qui l'exécute. Il est basé sur le simulateur de Modbus Test Kit. Cela montre comment créer en quelques lignes un simulateur qui vous rendra peut-être de bons services. Voici le code.

from modbus_tk.simulator import *
from modbus_tk.simulator_rpc_client import SimulatorRpcClient
from modbus_tk.utils import WorkerThread
from modbus_tk.defines import *
from modbus_tk.modbus_tcp import TcpServer
import time

class SystemDataCollector:        
    """The class in charge of getting the CPU load"""
    def __init__(self, refresh_rate_in_sec):
        """Constructor"""
        self._simu = SimulatorRpcClient()
        self._max_count = refresh_rate_in_sec * 10    
        self._count = self._max_count-1
        
    def collect(self):
        """get the CPU load thanks to WMI"""
        try:
            self._count += 1
            if self._count >= self._max_count:
                self._count = 0
                #WMI get the load percentage of the machine
                from win32com.client import GetObject
                wmi = GetObject('winmgmts:')
                cpu = wmi.InstancesOf('Win32_Processor')
                for (_cpu, i) in zip(cpu, xrange(10)):
                    value = _cpu.Properties_('LoadPercentage').Value
                    cpu_usage = int(str(value)) if value else 0
                    #execute a RPC command for changing the value
                    self._simu.set_values(1, "Cpu", i, (cpu_usage, ))
        except Exception, excpt:
            LOGGER.debug("SystemDataCollector error: %s", str(excpt))
        time.sleep(0.1)
        
if __name__ == "__main__":
    #create the object for getting CPU data
    data_collector = SystemDataCollector(5)
    #create the thread in charge of calling the data collector
    system_monitor = WorkerThread(data_collector.collect)
    
    #create the modbus TCP simulator and one slave
    #and one block of analog inputs
    simu = Simulator(TcpServer())
    slave = simu.server.add_slave(1)
    slave.add_block("Cpu", ANALOG_INPUTS, 0, 10)
    
    try:
        LOGGER.info("'quit' for closing the server")
        
        #start the data collect
        system_monitor.start()
        
        #start the simulator! will block until quit command is received
        simu.start()
            
    except Exception, excpt:
        print excpt
            
    finally:
        #close the simulator
        simu.close()
        #stop the data collect
        system_monitor.stop()


Retour à La Une de Logo Paperblog

A propos de l’auteur


Luc 11 partages Voir son profil
Voir son blog

l'auteur n'a pas encore renseigné son compte l'auteur n'a pas encore renseigné son compte