Commit aeb128f0 authored by cermak's avatar cermak
Browse files

xray source device

Change-Id: I1c2b974cf0bd46a2427af48dd08e76b13237ef93
parent d5ea2ba1
......@@ -25,7 +25,7 @@ from time import sleep
from entangle import base
from entangle.core import Attr, Cmd, Prop, intrange, subdev, listof, \
ON, OFF, ALARM, FAULT, states, value_mapping
from entangle.core.errors import ConfigurationError, InvalidValue, \
CommunicationFailure, InvalidOperation
from entangle.device.serial import StringIO as SerialStringIO
......@@ -34,6 +34,10 @@ class StringIO(SerialStringIO):
"""Special serial StringIO device for thermofisher x-ray sources."""
def Communicate(self, msg):
#empty buffer
if self.read_availableLines():
#ask for value
for i in range(100):
if self.read_availableLines():
......@@ -44,4 +48,151 @@ class StringIO(SerialStringIO):
firstline = self.ReadLine()
if firstline != msg:
raise CommunicationFailure(f"command not repeated {firstline}!={msg}")
return self.ReadLine()
\ No newline at end of file
return self.ReadLine()
class XRaySource(base.DiscreteOutput):
"""Switching device for Thermo Fisher X-Ray sources.
This devices provides control for X-Ray source with interlocks.
attributes = {
'Actual voltage of the source.',
disallowed_read=(states.INIT, states.UNKNOWN,)),
'Actual current of the source (beam).',
disallowed_read=(states.INIT, states.UNKNOWN,)),
'Indicate if all iterlocks are closed',
disallowed_read=(states.INIT, states.UNKNOWN,)),
properties = {
'iodev': Prop(subdev, 'I/O device name.'),
'mapping': Prop(value_mapping, '', default=[
'0: off', '1: on'
'Target current in uA when source is running, default is the '
'maximum supported by the device.',
'Target voltage in kV when source is running, default is the '
'maximum supported by the device.',
iodev_defaults = {
'class': 'StringIO',
'baudrate': 38400,
supported_models = set(['DXS11-5025'])
_targetI = 0
_targetU = 0
_I = 0
_U = 0
_state = None
_doorsclosed = None
_status = "" # Infocus/Warmup
def init(self):
self._iodev = self.connect_client('iodev', self.iodev, 'StringIO')
self.meta = {}
lastkey = None
# Parse hello string
resp = iter(self._iodev.Communicate('HELLO')[9:][:-1].split())
for k in resp:
if v := next(resp, None):
if lastkey and k == "S/N":
k = f"{lastkey} SN"
lastkey = k
self.meta[k] = v
if self.meta["PS"] not in self.supported_models:
raise UnrecognizedHardware(f'Model {self.meta["PS"]} not supported!')
# set correct target parameters
# ask for values
def Reset(self):
# set correct target parameters
self._iodev.Communicate('BEAM {self.targetcurrent}')
self._iodev.Communicate('KV {self.targetvoltage}')
# This should be called periodically
def state(self):
# Shows how to handle status updates, if the device supports
# reading status information.
res = self._iodev.Communicate('STATUS')[2:]
# parse status string
# 'STATUS Off HV .00 0.00 Beam 0.0 0.0 Safe Infocus'
res = res.split()
_, state, _, self._U, self._targetU, _, self._I, self._targetI, \
self._doorsclosed, self._status = res.split()
# parse values
self._state = {'On': 1, 'Off': 0}.get(res[1])
self._U = float(res[3])
self._targetU = float(res[4])
self._I = float(res[6])
self._targetI = float(res[7])
self._doorsclosed = {'Safe': True, 'Unsafe': False}.get(res[8])
self._status = res[8]
if self._targetI != self.targetcurrent:
return FAULT, f'Wrong target current {self._targetI} != {self.targetcurrent}'
if self._targetU != self.targetvoltage:
return FAULT, f'Wrong target voltage {self._targetU} != {self.targetvoltage}'
if self._status == 'Infocus' and self._state == False:
return ON, 'idle'
elif self._status == 'Infocus' and self._state == True:
return ON, 'running'
elif self._status == 'Warmup' and self._state == False:
return ON, 'cooling down'
elif self._status == 'Warmup' and self._state == True:
return ALARM, 'warming up'
self.log.warning(f'unknown status {self._status}')
return ALARM, 'unknown status {self._status}'
def read_voltage(self):
return self._U
def get_voltage_unit(self):
return 'kV'
def read_current(self):
return self._I
def get_current_unit(self):
return 'uA'
def read_doorclosed(self):
return self._doorsclosed
def Stop(self):
self._iodev.Communicate('XRAY OFF')
def read_value(self):
return self._state
def write_value(self, value):
self._iodev.Communicate(f'XRAY {value}')
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment