Commit 0366847c authored by Georg Brandl's avatar Georg Brandl Committed by cermak
Browse files

Mecademic beta

Change-Id: I5ed2c1abfbfed25c2bc7c2c941f074f6fcaf79e7
parent ea9b31a2
......@@ -19,3 +19,4 @@ Miscellaneous devices
chiller
proumid
pdu
mecademic
\ No newline at end of file
Mecademic robots
----------------
.. autodev:: mecademic_robot.DiscreteOutput
.. autodev:: mecademic_robot.AnalogOutput
.. autodev:: mecademic_robot.StringIO
# -*- coding: utf-8 -*-
# *****************************************************************************
# MLZ library of Tango servers
# Copyright (c) 2015-2019 by the authors, see LICENSE
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Georg Brandl <g.brandl@fz-juelich.de>
#
# *****************************************************************************
from entangle import base
from entangle.core import Attr, Cmd, Prop, intrange, subdev, listof, \
ON, OFF, ALARM, FAULT, states
from entangle.core.errors import ConfigurationError, InvalidValue, \
CommunicationFailure, InvalidOperation
from entangle.device.network import StringIO as NetworkStringIO
from entangle.lib.sequence import SequencerMixin, Step
disallow_states = (states.FAULT, states.INIT, states.BUSY, states.UNKNOWN)
class StringIO(NetworkStringIO):
"""Special network StringIO device to be used for the robot."""
def init(self):
NetworkStringIO.init(self)
# This cannot be set via Tango, because DevString can't contain NULs.
NetworkStringIO.write_endOfLine(self, '\x00')
def write_endOfLine(self, value):
raise RuntimeError('setting eol not supported for this device')
class DiscreteOutput(SequencerMixin, base.DiscreteOutput):
"""Control a Mecademic Meca500 6-axis robot arm in terms of defined
*positions* (values of the 6 joints) and *transitions* (movement commands
that define how to get from one position to the other).
"""
properties = {
'iodev': Prop(subdev, 'I/O device name.'),
'initcmds': Prop(str,
'Motion commands to run at init time (usually '
'setting up the allowed speeds.'),
'posnames': Prop(listof(str),
'Names for the known positions given in '
'"positions".'),
'positions': Prop(listof(float), 'Known poses in the form '
'[x, y, z, a1, a2, a3, c1, c2, c3, ...].'),
'transitions': Prop(listof(str),
'Motion commands for transitioning between '
'known positions (given by names) in the form '
'["trans1pos1", "trans1pos2", "command list", '
'"trans2pos1", "trans2pos2", "command list", '
'...].'),
}
commands = {
'Grip': Cmd('Close or open the gripper.',
bool, None, 'True (close) or False (open).', '',
disallowed=disallow_states),
'HasGrip': Cmd('True if robot is gripping something.',
None, bool, '', '',
disallowed=disallow_states),
'DriveTo': Cmd('Drive joints directly to a given position.',
int, None, 'The position index, as defined in the '
'properties.', '',
disallowed=disallow_states),
'DriveToNamed': Cmd('Drive joints directly to a given named position.',
str, None, 'The position, as defined in the '
'posnames properties.', '',
disallowed=disallow_states),
'Exec': Cmd('Execute arbitrary movement commands.',
str, None, 'The movement commands, separated by '
'newlines or semicolons.', '',
disallowed=disallow_states),
}
attributes = {
# TODO: allow reading while BUSY
'joints': Attr(float, 'Positions of the three joints.',
dims=1, max_x=6, writable=True,
disallowed_read=disallow_states,
disallowed_write=disallow_states),
}
iodev_defaults = {
'class': 'StringIO',
'port': 10000,
'timeout': 15, # TODO: get rid of this requirement for Activate+Home
}
def init(self):
self._pos = []
self._namedpos = {}
self._trans = {}
self._check_positions()
self._attr_vals['value'] = intrange(0, len(self._pos) - 1)
self.init_sequencer(state_on_stop=ON)
self._iodev = self.connect_client('iodev', self.iodev, 'StringIO')
try:
self._communicate('SetEOB(1)', 2054)
self._communicate('SetEOM(0)', 2053)
self._iodev.Flush()
except InvalidOperation:
# TODO: happens if robot is in error...
pass
if self.initcmds and self.state()[0] != FAULT:
self._drive(self._prepare_cmds(self.initcmds))
self.seq_join()
def _check_positions(self):
vias = {}
if len(self.positions) % 9 != 0:
raise ConfigurationError('"positions" property must have a '
'multiple of 9 items')
if 9 * len(self.posnames) != len(self.positions):
raise ConfigurationError('"positions" must have the same length '
'as "posnames" times 9')
if len(set(self.posnames)) != len(self.posnames):
raise ConfigurationError('all "posnames" must be distinct')
if len(self.transitions) % 4 != 0:
raise ConfigurationError('"transitions" property must have a '
'multiple of 4 items')
for i in range(0, len(self.positions), 9):
self._pos.append(self.positions[i:i+9])
for (i, posname) in enumerate(self.posnames):
self._namedpos[posname] = i
for i in range(0, len(self.transitions), 4):
fr, to, cmds, reflex = self.transitions[i:i+4]
if fr not in self._namedpos:
raise ConfigurationError('named position "%s" does not exist'
% fr)
if to not in self._namedpos:
raise ConfigurationError('named position "%s" does not exist'
% to)
if fr == to:
raise ConfigurationError('positions in transition must be '
'different')
fr = self._namedpos[fr]
to = self._namedpos[to]
if cmds.startswith('via '):
try:
vias[fr, to] = [self._namedpos[x]
for x in cmds[4:].split(',')]
except KeyError:
raise ConfigurationError('via entry in "transitions" must '
'be like "via p1,p2,p3"')
if reflex:
vias[to, fr] = list(reversed(vias[fr, to]))
else:
self._trans[fr, to] = self._prepare_cmds(cmds, to)
if reflex:
self._trans[to, fr] = self._prepare_cmds(cmds, fr)
for (fr, to), via in vias.items():
pos = fr
cmds = []
for pnext in via + [to]:
if (pos, pnext) not in self._trans:
raise ConfigurationError('via: no defined transition from '
'%d to %d' % (pos, pnext))
cmds.extend(self._trans[pos, pnext])
pos = pnext
self._trans[fr, to] = cmds
for posid in range(len(self._pos)):
if not any(fr == posid for (fr, to) in self._trans):
raise ConfigurationError('no defined transition from '
'position %d' % posid)
if not any(to == posid for (fr, to) in self._trans):
raise ConfigurationError('no defined transition to '
'position %d' % posid)
def _prepare_cmds(self, string, tgt=None):
if tgt is not None:
tgtstr = ','.join(map(str, self._pos[tgt][:-3]))
string = string.replace('%tgt', tgtstr)
return [x.strip() for x in string.replace(';', '\n').splitlines()]
def _communicate(self, cmd, expected_reply):
res = self._iodev.Communicate(cmd)
return self._decode(res, expected_reply)
def _decode(self, res, expected_reply):
try:
code, text = res.strip('[]').split('][')
code = int(code)
except ValueError:
raise CommunicationFailure('Invalid reply from robot: %r' % res)
if code == expected_reply:
return text
raise InvalidOperation('Error %s: %s' % (code, text))
def _robot_state(self):
res = self._communicate('GetStatusRobot', 2007)
return [int(x) for x in res.split(',')]
def _get_list(self, cmd, expected_reply):
return [float(x) for x in
self._communicate(cmd, expected_reply).split(',')]
def _drive(self, cmds):
seq = []
seq.append(Step('send commands', 0, self._step_drive_cmds, cmds))
seq.append(Step('wait for completion', 0.1, self._step_wait))
self.start_sequence(seq)
def _step_drive_cmds(self, _store, cmds):
self._communicate('ClearMotion', 2044)
for cmd in cmds:
self._iodev.WriteLine(cmd)
self._communicate('ResumeMotion', 2043)
def _step_wait(self, _store):
if not self._iodev.availableLines:
return True
self._decode(self._iodev.ReadLine(), 3012)
return False
def _ext_state(self):
act, homed, sim, error = self._robot_state()[:4]
const, text = ON, 'active'
if not homed:
const = ALARM
text += ', not homed'
if sim:
const = ALARM
text += ', simulation active'
if not act:
const = OFF
text = 'not activated'
if error:
const = FAULT
text += ', error (please reset)'
return const, text
def _gripper(self):
res = self._communicate('GetStatusGripper', 2079)
gripstates = [int(x) for x in res.split(',')]
if not gripstates[0]: # disabled
return 0
return gripstates[2]
def _get_pos(self):
cur = self._get_list('GetPose', 2027) + self._get_list('GetConf', 2029)
for posid, pvals in enumerate(self._pos):
for p1, p2 in zip(cur, pvals):
if abs(p1 - p2) > 0.001:
if abs(p1 - p2) != 360: # -180/180 angle equality
break
else:
return posid
return -1
def read_value(self):
if self.seq_is_alive():
return -1 # moving
return self._get_pos()
def write_value(self, value):
cur = self._get_pos()
if cur == -1:
raise InvalidOperation('Not at defined position, cannot move '
'automatically')
if cur == value:
return
if (cur, value) not in self._trans:
raise InvalidValue('No transition defined between robot '
'positions %s and %s' %
(self.posnames[cur], self.posnames[value]))
self._drive(self._trans[cur, value])
def Stop(self):
self._communicate('ClearMotion', 2044)
def Reset(self):
if self._robot_state()[3]:
self._communicate('ResetError', 2005)
if not self._robot_state()[0]:
self._communicate('ActivateRobot', 2000)
if not self._robot_state()[1]:
self._communicate('Home', 2002)
self._communicate('SetEOB(1)', 2054)
self._communicate('SetEOM(0)', 2053)
if self.initcmds:
self._drive(self._prepare_cmds(self.initcmds))
self.seq_join()
def Off(self):
self._communicate('DeactivateRobot', 2004)
def On(self):
self._communicate('ActivateRobot', 2000)
def get_joints_unit(self):
return 'deg'
def read_joints(self):
return self._get_list('GetJoints', 2026)
def write_joints(self, value):
if len(value) != 6:
raise InvalidValue('Need 6 joint values')
# TODO: check limits
self._drive(['MoveJoints(%s)' % ','.join(map(str, value))])
def DriveTo(self, posid):
pos = self._pos[posid]
self._drive(['SetConf(%s)' % ','.join(str(int(x)) for x in pos[-3:]),
'MovePose(%s)' % ','.join(map(str, pos[:-3]))])
def DriveToNamed(self, posname):
self.DriveTo(self._namedpos[posname])
def Exec(self, cmds):
self._drive(self._prepare_cmds(cmds))
def Grip(self, close):
if close:
self._drive(['GripperClose'])
else:
self._drive(['GripperOpen'])
def HasGrip(self):
return self._gripper()
class AnalogOutput(base.AnalogOutput):
"""Control a single joint of the 6-axis robot arm."""
properties = {
'iodev': Prop(subdev, 'The DiscreteOutput robot device.'),
'joint': Prop(intrange(1, 6), 'Joint index.'),
}
# TODO: get absmin/max from hardware
def init(self):
self._iodev = self.connect_client('iodev', self.iodev, 'DiscreteOutput')
def read_value(self):
return self._iodev.joints[self.joint - 1]
def write_value(self, value):
pos = self._iodev.joints
pos[self.joint - 1] = value
self._iodev.joints = pos
def get_value_unit(self):
return 'deg'
def state(self):
return self._iodev.State(), self._iodev.Status()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment