Training: Channel Access with cothread
#
- Author:
Michael Abbott
- Documentation:
file:///dls_sw/prod/common/python/RHEL6-x86_64/cothread/2-15/docs/html/index.html
- Sources:
file:///dls_sw/prod/common/python/RHEL6-x86_64/cothread/2-15 dls-controls/cothread
- Development version:
Cothread#
The cothread
library provides EPICS “Channel Access” bindings for Python.
The library comprises two parts:
cothread
itself: “cooperative threads”.cothread.catools
provides channel access bindings.
Because EPICS involves communication with other machines events may happen at
any time. The cothread
library provides a mechanism for managing these
updates with the minimum of interference with the rest of the program.
Cothread catools bindings#
The EPICS Channel Access Python interface consists of three functions:
caget(pvs, ...)
Retrieves value from a single PV or a list of PVs.
caput(pvs, values, ...)
Writes value or values to a single PV or list of PVs.
camonitor(pvs, callback, ...)
Creates “subscription” with updates every time a PV changes:
callback
is called with a new value every time any listed PV updates.
Preliminaries#
Need to import cothread
, all of the examples will start with
the following code, 2.15 is the current release:
from pkg_resources import require
require('cothread==2.15')
import cothread
from cothread.catools import *
Channel access waveform data is returned as numpy
arrays, so it will be
convenient to include this in our list of imports:
import numpy
Example: Printing a PV#
Calling caget
with a PV name returns the value of the PV.
print caget('SR-DI-DCCT-01:SIGNAL')
Calling caget
with a list of PV names returns a list of PV values.
bpms = ['SR%02dC-DI-EBPM-%02d:SA:X' % (c+1, n+1)
for c in range(24) for n in range(7)]
sax = caget(bpms)
print sax
Note that calling caget
with a long list is potentially much faster than
calling caget
on each element of the list in turn, as passing a list to
caget
allows all fetches to proceed concurrently.
Testing Speed of caget
#
Compare:
import time
start = time.time(); caget(bpms); print time.time() - start
with
start = time.time(); [caget(bpm) for bpm in bpms]; print time.time() - start
I tend to get a difference of a factor of around 50 between these two tests.
Exercise: Timing test#
Exercise: put everything in a file and run this test standalone. Try adding more PVs and making the test controlled by a command line parameter.
Shape Polymorphism#
As we’ve already seen, the behaviour of the three channel access functions varies depending on whether the first argument is a string or a list of strings, for example:
print caget('SR-DI-DCCT-01:SIGNAL')
print caget(['SR-DI-DCCT-01:SIGNAL', 'SR-DI-DCCT-01:LIFETIME'])
Similarly caput
can write to multiple pvs, in which case a single value or a
list of values can be passed.
caput(['TEST:PV1', 'TEST:PV2'], 3) # Writes 3 to both
caput(['TEST:PV1', 'TEST:S'], [10, 'testing']) # Writes different values
# To write a repeated array to multiple TEST:PVs need to use repeat_value
caput(['TEST:WF1', 'TEST:WF2'], [1, 2], repeat_value = True)
Note: this is the only case where repeat_value=True
is needed.
Example: Monitoring a PV#
Updates to a PV result in a callback function being called.
def print_update(value):
print value.name, value
m = camonitor('SR-DI-DCCT-01:SIGNAL', print_update)
cothread.Sleep(10)
m.close()
Updates arrive in the background until we close the monitor, but for normal applications we leave the monitor open until the application exits.
camonitor
for Lists of PVs#
If camonitor
is passed a list of PVs it expects the update function to take
a second argument which is used as an index.
def print_update(value, index):
print value.name, index, value
mm = camonitor(bpms, print_update)
cothread.Sleep(1)
for m in mm:
m.close()
If the index is not needed there is no particular benefit to calling
camonitor
on lists of PVs, unlike for caget
and (it depends)
caput
.
Exercise: camonitor
and caput
#
Use camonitor
and caput
to monitor TEST:PV1
and add 1 to it after a
couple of seconds.
Use cothread.Sleep(...)
for sleeping.
Warning
Don’t use time.sleep(...)
when using cothread: this will prevent updates
from taking place!
Use cothread.WaitForQuit()
at the end of your script if there’s nothing else
to do while cothread does its work.
A note on the last exercise#
The obvious answer is to call cothread.Sleep
in the camonitor callback
function, eg:
def do_update(value):
cothread.Sleep(1)
caput(value.name, value + 1)
m = camonitor('TEST:PV1', do_update)
cothread.Sleep(10)
m.close()
Unfortunately doing this has the unfortunate side effect of blocking
all other camonitor updates during the Sleep
.
Only one camonitor callback function is processed at a time. If you need to do
long lasting work in response to a PV update, push the processing somewhere else
with Spawn
or Event
.
Augmented Values#
Values returned by caget
and delivered by camonitor
are “augmented” by
extra information. The following two fields are always present:
.name
Contains the full name of the requested PV.
.ok
Will be
True
for values fetched without trouble,False
if value is not really a value!
For example:
v = caget('SR-DI-DCCT-01:SIGNAL')
print v.name, v.ok, v
Augmented Values are Ordinary#
Note that v
(from above) is an ordinary number:
print isinstance(v, float)
However, it’s not completely ordinary:
print type(v), type(1.0)
It will behave just like an ordinary float, but can be made completely
ordinary with the +
operator:
print +v, type(+v)
However you should not normally need to use this!
Getting Values with Timestamps#
It is possible to get timestamp information with a retrieved or monitored PV, but it needs to be requested:
v = caget('SR-DI-DCCT-01:SIGNAL', format = FORMAT_TIME)
print v.name, v, v.datetime
The timestamp is also available in raw Unix format (seconds since 1970):
print v.timestamp
Example: Gathering Updates#
We’ll monitor a requested number of updates and gather them into a list. For
this example the state is held in the local variables of the gather
function: note the use of a nested function.
def gather(pv, count):
values = []
done = cothread.Event()
def update(value):
values.append(value)
if len(values) >= count:
done.Signal()
m = camonitor(pv, update)
done.Wait()
m.close()
return values
print gather('SR21C-DI-DCCT-01:SIGNAL', 10)
Example: Circular Updates Buffer#
Let’s try a different version with a circular buffer. In this case we need a class because the buffer will remain in existence for longer.
class Gather:
def __init__(self, pv, count):
self.count = count
self.values = [0] * count
self.inptr = 0
camonitor(pv, self.update)
def update(self, value):
self.values[self.inptr] = value
self.inptr = (self.inptr + 1) % self.count
def get(self):
return self.values[self.inptr:] + self.values[:self.inptr]
buf = Gather('SR21C-DI-DCCT-01:SIGNAL', 10)
cothread.Sleep(1)
print buf.get()
cothread.Sleep(5)
print buf.get()
This buffer can now safely be left running and will at any time return the last
count
values received.
Example: Gathering Arrays#
Working with numpy
arrays can be much more efficient than working with
Python lists of values:
x = numpy.array(caget(bpms))
print x.mean(), x.std()
print x - x.mean()
However, when it matters, timestamp information and other extended attributes are lost when gathering values into arrays, so if the timestamps are needed a little more care is required:
rawx = caget(bpms, format = FORMAT_TIME)
x = numpy.array(rawx)
tx = numpy.array([v.timestamp for v in rawx])
print tx.max() - tx.min() # Check spread of timestamps
Example: Default Error Handling#
For caget
we only really need to worry about fetching PVs that don’t exist,
for camonitor
we may also need to pay attention to PVs becoming
disconnected. The default behaviour of cothread
produces sensible results,
but this can be overridden.
This behaviour of raising an exception when caget
and caput
fails is the
best default behaviour, because in routine naive use if a PV is unavailable then
this is an unrecoverable error. However, this isn’t always what we want.
Adjusting the Timeout#
caget('bogus')
This raises an exception after five seconds. The timeout can be adjusted with an explicit argument:
caget('bogus', timeout = 1)
Alternatively, when fetching very large numbers of PVs through the gateway it can happen that the default five second timeout isn’t long enough.
Catching Errors from caget
#
We can ask caget
to return an error value instead of raising an exception:
v1, v2 = caget(['bogus', 'SR-DI-DCCT-01:SIGNAL'], throw = False)
print v1.ok, v2.ok
Note that if a pv is not ok
we can’t test for things like timestamps:
v = caget('bogus', format = FORMAT_TIME, throw = False)
print v.name, v.ok
print v.datetime
This raises an exception when trying to interrogate the datetime
field on a
PV that never arrived!
Catching Errors from caput
#
The same applies to caput
. We’ll try writing to a PV we can write to, one
we can’t, and one that doesn’t exist:
pvs = ['TEST:PV1', 'bogus', 'SR-DI-DCCT-01:SIGNAL']
results = caput(pvs, 1, timeout = 1, throw = False)
for result in results:
print result.name, result.ok
print result
Note that a complete description of the error is in the failing results: in this case each result is a catchable exception object with a descriptive error message as its string representation.
Cothread and Qt#
The cothread library relies on cooperative transfer of control between cothreads, and similarly Qt has its own mechanism of events and notifications. For cothread and Qt to work together, these two libraries need to be introduced to each other.
Fortunately this is easy. Run:
cothread.iqt()
It’s safest to run this before importing any Qt dependent libraries.
This function will create and return the Qt application object if you need it.
Plotting: Preamble#
A certain amount of boilerplate preamble is required to get interactive plotting
working with dls-python
. We’ll show the complete set:
from pkg_resources import require
require('cothread==2.15')
require('matplotlib')
import cothread
from cothread.catools import *
import numpy
cothread.iqt()
import pylab
pylab.ion()
Plotting: An Example#
Now we can fetch and plot a waveform:
wfx = caget('SR-DI-EBPM-01:SA:X')
ll = pylab.plot(wfx)
Now let’s make it update continuously:
def update_ll(wfx):
ll[0].set_ydata(wfx)
pylab.draw()
m = camonitor('SR-DI-EBPM-01:SA:X', update_ll)
Exercise: Plot both X and Y on the same graph. Hint: SR-DI-EBPM-01:BPMID
contains a good x-axis.
Advanced Topics#
Advanced Topic: Cothreads#
We’ve already seen cothreads: camonitor
callbacks occur “in the background”,
really they occur on a dedicated cothread.
Cothreads are cooperative “threads”, which means a cothread will run until it deliberately relinquishes control. This has advantages and disadvantages:
- Advantage
No locking is required, a cothread will not run when it’s not expected.
- Disadvantage
A cothread that won’t relinquish control will block all other cothreads.
Note that Python can’t make use of multiple cores.
Creating a Cothread#
Creating a cothread is very easy: just define the function to run in the
cothread and call cothread.Spawn
:
running = True
def background(name, sleep=1):
while running:
print 'Hello from', name
cothread.Sleep(sleep)
cothread.Spawn(background, 'test1')
cothread.Spawn(background, 'test2', sleep = 2)
cothread.Sleep(10)
running = False
cothread.Sleep(3)
Communicating with Cothreads#
Communicate using event objects and queues:
cothread.Event()
Creates an event object which is either ready or not ready. When it’s ready it has a value.
cothread.EventQueue()
Almost exactly like an event object, but multiple values can be waiting.
Both objects support two methods:
- event
.Signal(value)
Makes event object ready with given value.
- event
.Wait(timeout=None)
Suspends cothread until object is ready, consumes and returns value.
Example: Using Event
#
class PV:
def __init__(self, pv):
self.event = cothread.Event()
camonitor(pv, self.__on_update, format = FORMAT_TIME)
def __on_update(self, value):
self.event.Signal(value)
def get(self, timeout=None):
return self.event.Wait(timeout)
import time
pv = PV('SR21C-DI-DCCT-01:SIGNAL')
for n in range(5):
v = pv.get()
print v.timestamp, time.time()
cothread.Sleep(1)
Note that we always get the latest value, even though the PV updates at 5Hz.
Cothread already implements a fuller featured version of this class available as
cothread.pv.PV
, and another variant cothread.pv.PV_array
.
Cothread Suspension Points#
The following functions are cothread suspension points (where control can be yielded to another cothread):
Sleep()
,SleepUntil()
,Yield()
event
.Wait()
caget()
caput()
most of the time (see documentation to avoid suspension).
The following cothread modules provide extra cothread aware suspension points, see documentation for details:
cothread.coselect
: providesselect()
andpoll
functionality.cothread.cosocket
: provides cothread aware socket API.
Cothreads and Real Threads#
Python threads are created with the threading.Thread
class. A Python thread
cannot safely call cothread methods … except for cothread.Callback()
,
which arranges for its argument to be called in a cothread:
def callback_code(n):
print 'cothread tick', n
import time
def thread_code(count):
for n in range(count):
print 'thread tick', n
cothread.Callback(callback_code, n)
time.sleep(0.5)
import thread
thread.start_new_thread(thread_code, (5,))
cothread.Sleep(5)
Slightly more Realistic Version#
def consumer(event):
while True:
n = event.Wait()
print 'consumed', n
import time
def producer(event, count):
for n in range(count):
print 'thread tick', n
# event.Signal(n)
cothread.Callback(lambda n: event.Signal(n), n)
time.sleep(0.5)
import thread
event = cothread.Event()
cothread.Spawn(consumer, event)
thread.start_new_thread(producer, (event, 5))
cothread.Sleep(5)
Try replacing the Callback
call with a direct Signal
call and see what
happens.
Bonus question: what’s wrong with this?
cothread.Callback(lambda: event.Signal(n))