Training: Channel Access with cothread#

Author:

Michael Abbott

Documentation:

file:///dls_sw/prod/common/python/RHEL6-x86_64/cothread/2-15/docs/html/index.html

https://cothread.readthedocs.io

Sources:

file:///dls_sw/prod/common/python/RHEL6-x86_64/cothread/2-15 dls-controls/cothread

Development version:

file:///home/mga83/epics/cothread

Cothread#

The cothread library provides EPICS “Channel Access” bindings for Python. The library comprises two parts:

  • cothread itself: “cooperative threads”.

  • cothread.catools provides channel access bindings.

Because EPICS involves communication with other machines events may happen at any time. The cothread library provides a mechanism for managing these updates with the minimum of interference with the rest of the program.

Cothread catools bindings#

The EPICS Channel Access Python interface consists of three functions:

caget(pvs, ...)

Retrieves value from a single PV or a list of PVs.

caput(pvs, values, ...)

Writes value or values to a single PV or list of PVs.

camonitor(pvs, callback, ...)

Creates “subscription” with updates every time a PV changes: callback is called with a new value every time any listed PV updates.

Preliminaries#

Need to import cothread, all of the examples will start with the following code, 2.15 is the current release:

from pkg_resources import require
require('cothread==2.15')

import cothread
from cothread.catools import *

Channel access waveform data is returned as numpy arrays, so it will be convenient to include this in our list of imports:

import numpy

Example: Printing a PV#

Calling caget with a PV name returns the value of the PV.

print caget('SR-DI-DCCT-01:SIGNAL')

Calling caget with a list of PV names returns a list of PV values.

bpms = ['SR%02dC-DI-EBPM-%02d:SA:X' % (c+1, n+1)
    for c in range(24) for n in range(7)]
sax = caget(bpms)
print sax

Note that calling caget with a long list is potentially much faster than calling caget on each element of the list in turn, as passing a list to caget allows all fetches to proceed concurrently.

Testing Speed of caget#

Compare:

import time
start = time.time(); caget(bpms); print time.time() - start

with

start = time.time(); [caget(bpm) for bpm in bpms]; print time.time() - start

I tend to get a difference of a factor of around 50 between these two tests.

Exercise: Timing test#

Exercise: put everything in a file and run this test standalone. Try adding more PVs and making the test controlled by a command line parameter.

Shape Polymorphism#

As we’ve already seen, the behaviour of the three channel access functions varies depending on whether the first argument is a string or a list of strings, for example:

print caget('SR-DI-DCCT-01:SIGNAL')
print caget(['SR-DI-DCCT-01:SIGNAL', 'SR-DI-DCCT-01:LIFETIME'])

Similarly caput can write to multiple pvs, in which case a single value or a list of values can be passed.

caput(['TEST:PV1', 'TEST:PV2'], 3)              # Writes 3 to both
caput(['TEST:PV1', 'TEST:S'], [10, 'testing'])  # Writes different values

# To write a repeated array to multiple TEST:PVs need to use repeat_value
caput(['TEST:WF1', 'TEST:WF2'], [1, 2], repeat_value = True)

Note: this is the only case where repeat_value=True is needed.

Example: Monitoring a PV#

Updates to a PV result in a callback function being called.

def print_update(value):
    print value.name, value

m = camonitor('SR-DI-DCCT-01:SIGNAL', print_update)
cothread.Sleep(10)
m.close()

Updates arrive in the background until we close the monitor, but for normal applications we leave the monitor open until the application exits.

camonitor for Lists of PVs#

If camonitor is passed a list of PVs it expects the update function to take a second argument which is used as an index.

def print_update(value, index):
    print value.name, index, value

mm = camonitor(bpms, print_update)
cothread.Sleep(1)
for m in mm:
    m.close()

If the index is not needed there is no particular benefit to calling camonitor on lists of PVs, unlike for caget and (it depends) caput.

Exercise: camonitor and caput#

Use camonitor and caput to monitor TEST:PV1 and add 1 to it after a couple of seconds.

Use cothread.Sleep(...) for sleeping.

Warning

Don’t use time.sleep(...) when using cothread: this will prevent updates from taking place!

Use cothread.WaitForQuit() at the end of your script if there’s nothing else to do while cothread does its work.

A note on the last exercise#

The obvious answer is to call cothread.Sleep in the camonitor callback function, eg:

def do_update(value):
    cothread.Sleep(1)
    caput(value.name, value + 1)

m = camonitor('TEST:PV1', do_update)
cothread.Sleep(10)
m.close()

Unfortunately doing this has the unfortunate side effect of blocking all other camonitor updates during the Sleep.

Only one camonitor callback function is processed at a time. If you need to do long lasting work in response to a PV update, push the processing somewhere else with Spawn or Event.

Augmented Values#

Values returned by caget and delivered by camonitor are “augmented” by extra information. The following two fields are always present:

.name

Contains the full name of the requested PV.

.ok

Will be True for values fetched without trouble, False if value is not really a value!

For example:

v = caget('SR-DI-DCCT-01:SIGNAL')
print v.name, v.ok, v

Augmented Values are Ordinary#

Note that v (from above) is an ordinary number:

print isinstance(v, float)

However, it’s not completely ordinary:

print type(v), type(1.0)

It will behave just like an ordinary float, but can be made completely ordinary with the + operator:

print +v, type(+v)

However you should not normally need to use this!

Getting Values with Timestamps#

It is possible to get timestamp information with a retrieved or monitored PV, but it needs to be requested:

v = caget('SR-DI-DCCT-01:SIGNAL', format = FORMAT_TIME)
print v.name, v, v.datetime

The timestamp is also available in raw Unix format (seconds since 1970):

print v.timestamp

Example: Gathering Updates#

We’ll monitor a requested number of updates and gather them into a list. For this example the state is held in the local variables of the gather function: note the use of a nested function.

def gather(pv, count):
    values = []
    done = cothread.Event()
    def update(value):
        values.append(value)
        if len(values) >= count:
            done.Signal()
    m = camonitor(pv, update)
    done.Wait()
    m.close()
    return values

print gather('SR21C-DI-DCCT-01:SIGNAL', 10)

Example: Circular Updates Buffer#

Let’s try a different version with a circular buffer. In this case we need a class because the buffer will remain in existence for longer.

class Gather:
    def __init__(self, pv, count):
        self.count = count
        self.values = [0] * count
        self.inptr = 0
        camonitor(pv, self.update)
    def update(self, value):
        self.values[self.inptr] = value
        self.inptr = (self.inptr + 1) % self.count
    def get(self):
        return self.values[self.inptr:] + self.values[:self.inptr]

buf = Gather('SR21C-DI-DCCT-01:SIGNAL', 10)
cothread.Sleep(1)
print buf.get()
cothread.Sleep(5)
print buf.get()

This buffer can now safely be left running and will at any time return the last count values received.

Example: Gathering Arrays#

Working with numpy arrays can be much more efficient than working with Python lists of values:

x = numpy.array(caget(bpms))
print x.mean(), x.std()
print x - x.mean()

However, when it matters, timestamp information and other extended attributes are lost when gathering values into arrays, so if the timestamps are needed a little more care is required:

rawx = caget(bpms, format = FORMAT_TIME)
x = numpy.array(rawx)
tx = numpy.array([v.timestamp for v in rawx])
print tx.max() - tx.min()   # Check spread of timestamps

Example: Default Error Handling#

For caget we only really need to worry about fetching PVs that don’t exist, for camonitor we may also need to pay attention to PVs becoming disconnected. The default behaviour of cothread produces sensible results, but this can be overridden.

This behaviour of raising an exception when caget and caput fails is the best default behaviour, because in routine naive use if a PV is unavailable then this is an unrecoverable error. However, this isn’t always what we want.

Adjusting the Timeout#

caget('bogus')

This raises an exception after five seconds. The timeout can be adjusted with an explicit argument:

caget('bogus', timeout = 1)

Alternatively, when fetching very large numbers of PVs through the gateway it can happen that the default five second timeout isn’t long enough.

Catching Errors from caget#

We can ask caget to return an error value instead of raising an exception:

v1, v2 = caget(['bogus', 'SR-DI-DCCT-01:SIGNAL'], throw = False)
print v1.ok, v2.ok

Note that if a pv is not ok we can’t test for things like timestamps:

v = caget('bogus', format = FORMAT_TIME, throw = False)
print v.name, v.ok
print v.datetime

This raises an exception when trying to interrogate the datetime field on a PV that never arrived!

Catching Errors from caput#

The same applies to caput. We’ll try writing to a PV we can write to, one we can’t, and one that doesn’t exist:

pvs = ['TEST:PV1', 'bogus', 'SR-DI-DCCT-01:SIGNAL']
results = caput(pvs, 1, timeout = 1, throw = False)
for result in results:
    print result.name, result.ok
    print result

Note that a complete description of the error is in the failing results: in this case each result is a catchable exception object with a descriptive error message as its string representation.

Cothread and Qt#

The cothread library relies on cooperative transfer of control between cothreads, and similarly Qt has its own mechanism of events and notifications. For cothread and Qt to work together, these two libraries need to be introduced to each other.

Fortunately this is easy. Run:

cothread.iqt()

It’s safest to run this before importing any Qt dependent libraries.

This function will create and return the Qt application object if you need it.

Plotting: Preamble#

A certain amount of boilerplate preamble is required to get interactive plotting working with dls-python. We’ll show the complete set:

from pkg_resources import require
require('cothread==2.15')
require('matplotlib')

import cothread
from cothread.catools import *

import numpy

cothread.iqt()

import pylab
pylab.ion()

Plotting: An Example#

Now we can fetch and plot a waveform:

wfx = caget('SR-DI-EBPM-01:SA:X')
ll = pylab.plot(wfx)

Now let’s make it update continuously:

def update_ll(wfx):
    ll[0].set_ydata(wfx)
    pylab.draw()

m = camonitor('SR-DI-EBPM-01:SA:X', update_ll)

Exercise: Plot both X and Y on the same graph. Hint: SR-DI-EBPM-01:BPMID contains a good x-axis.

Advanced Topics#

Advanced Topic: Cothreads#

We’ve already seen cothreads: camonitor callbacks occur “in the background”, really they occur on a dedicated cothread.

Cothreads are cooperative “threads”, which means a cothread will run until it deliberately relinquishes control. This has advantages and disadvantages:

Advantage

No locking is required, a cothread will not run when it’s not expected.

Disadvantage

A cothread that won’t relinquish control will block all other cothreads.

Note that Python can’t make use of multiple cores.

Creating a Cothread#

Creating a cothread is very easy: just define the function to run in the cothread and call cothread.Spawn:

running = True
def background(name, sleep=1):
    while running:
        print 'Hello from', name
        cothread.Sleep(sleep)

cothread.Spawn(background, 'test1')
cothread.Spawn(background, 'test2', sleep = 2)

cothread.Sleep(10)
running = False

cothread.Sleep(3)

Communicating with Cothreads#

Communicate using event objects and queues:

cothread.Event()

Creates an event object which is either ready or not ready. When it’s ready it has a value.

cothread.EventQueue()

Almost exactly like an event object, but multiple values can be waiting.

Both objects support two methods:

event.Signal(value)

Makes event object ready with given value.

event.Wait(timeout=None)

Suspends cothread until object is ready, consumes and returns value.

Example: Using Event#

class PV:
    def __init__(self, pv):
        self.event = cothread.Event()
        camonitor(pv, self.__on_update, format = FORMAT_TIME)
    def __on_update(self, value):
        self.event.Signal(value)
    def get(self, timeout=None):
        return self.event.Wait(timeout)

import time
pv = PV('SR21C-DI-DCCT-01:SIGNAL')
for n in range(5):
    v = pv.get()
    print v.timestamp, time.time()
    cothread.Sleep(1)

Note that we always get the latest value, even though the PV updates at 5Hz.

Cothread already implements a fuller featured version of this class available as cothread.pv.PV, and another variant cothread.pv.PV_array.

Cothread Suspension Points#

The following functions are cothread suspension points (where control can be yielded to another cothread):

  • Sleep(), SleepUntil(), Yield()

  • event.Wait()

  • caget()

  • caput() most of the time (see documentation to avoid suspension).

The following cothread modules provide extra cothread aware suspension points, see documentation for details:

  • cothread.coselect: provides select() and poll functionality.

  • cothread.cosocket: provides cothread aware socket API.

Cothreads and Real Threads#

Python threads are created with the threading.Thread class. A Python thread cannot safely call cothread methods … except for cothread.Callback(), which arranges for its argument to be called in a cothread:

def callback_code(n):
    print 'cothread tick', n

import time
def thread_code(count):
    for n in range(count):
        print 'thread tick', n
        cothread.Callback(callback_code, n)
        time.sleep(0.5)

import thread
thread.start_new_thread(thread_code, (5,))
cothread.Sleep(5)

Slightly more Realistic Version#

def consumer(event):
    while True:
        n = event.Wait()
        print 'consumed', n

import time
def producer(event, count):
    for n in range(count):
        print 'thread tick', n
#        event.Signal(n)
        cothread.Callback(lambda n: event.Signal(n), n)
        time.sleep(0.5)

import thread
event = cothread.Event()
cothread.Spawn(consumer, event)
thread.start_new_thread(producer, (event, 5))
cothread.Sleep(5)

Try replacing the Callback call with a direct Signal call and see what happens.

Bonus question: what’s wrong with this?

cothread.Callback(lambda: event.Signal(n))