ReadTopic

class lsst.ts.salobj.topics.ReadTopic(*, salinfo, name, sal_prefix, max_history, queue_len=100)

Bases: lsst.ts.salobj.topics.base_topic.BaseTopic

Base class for reading a topic.

Parameters:
salinfo : SalInfo

SAL component information

name : str

Topic name, without a “command_” or “logevent_” prefix.

sal_prefix : str

SAL topic prefix: one of “command_”, “logevent_” or “”

max_history : int

Maximum number of historical items to read:

  • 0 is required for commands and the ackcmd reader
  • 1 is recommended for events and telemetry
queue_len : int (optional)

The maximum number of items that can be read and not dealt with by a callback function or next before older data will be dropped.

Raises:
ValueError

If max_history < 0.

ValueError

If max_history > 0 and the topic is volatile (command or ackcmd).

ValueError

If queue_len <= 0.

ValueError

If max_history > queue_len.

Notes

Queues

There are actually two queues: an internal queue whose length is set by queue_len and a dds queue whose length is set by low level configuration. Data can be lost in two ways:

  • If this class cannot read data from the dds queue fast enough, then older data will be dropped from the dds queue. You will get a warning log message if the reader starts to fall behind.
  • As data is read it is put on the internal queue. if a callback function or next does not process data quickly enough then older data is dropped from the internal queue. If you have a callback function you will get several warning log messages as this internal queue fills up. You get no warning otherwise because this class has no way of knowing whether or not you intend to read all data using next.

Reading

Reading is performed by the SalInfo which has single read loop that reads all topics. This is more efficient than having each ReadTopic read its own data.

Attributes:
isopen : bool

Is this read topic open? True until close is called.

dds_queue_length_checker : QueueCapacityChecker

Queue length checker for the DDS queue.

python_queue_length_checker : QueueCapacityChecker:

Queue length checker for the Python queue.

Attributes Summary

DataType The class of data for this topic.
allow_multiple_callbacks Can callbacks can run simultaneously?
callback Callback function, or None if there is not one.
has_callback Return True if there is a callback function.
has_data Has any data been seen for this topic?
max_history
metadata Get topic metadata as a TopicMetadata, if available, else None.

Methods Summary

aget([timeout]) Get the current value, if any, else wait for the next value.
close() Shut down and release resources.
flush() Flush the queue of unread data.
get([flush]) Get the most recently seen value, or None if no data ever seen.
get_oldest() Pop and return the oldest value from the queue, or None if the queue is empty.
next(*, flush[, timeout]) Wait for a value, possibly returning the oldest queued value.

Attributes Documentation

DataType

The class of data for this topic.

allow_multiple_callbacks

Can callbacks can run simultaneously?

Notes

Ignored for synchronous callbacks because those block while running. In particular, if the callback is synchronous but launches one or more background jobs then the number of those jobs cannot be limited by this class.

callback

Callback function, or None if there is not one.

The callback function is called when new data is received; it receives one argument: the data.

Raises:
TypeError

When setting a new callback if the callback is not None and is not callable.

Notes

The callback function can be synchronous or asynchronous (e.g. defined with async def).

Setting a callback flushes the queue, and it will remain empty as long as there is a callback.

get_oldest and next are prohibited if there is a callback function. Technically they could both work, but get_oldest would always return None and next would miss data if it arrived while waiting for something else. It seemed safer to just raise an exception.

has_callback

Return True if there is a callback function.

has_data

Has any data been seen for this topic?

max_history
metadata

Get topic metadata as a TopicMetadata, if available, else None.

Methods Documentation

aget(timeout=None)

Get the current value, if any, else wait for the next value.

Parameters:
timeout : float (optional)

Time limit, in seconds. If None then no time limit.

Returns:
data : DataType

The current or next value.

Raises:
RuntimeError

If a callback function is present.

Notes

Do not modify the returned data. To make a copy that you can modify use copy.copy(value).

close()

Shut down and release resources.

Intended to be called by SalInfo.close(), since that tracks all topics.

flush()

Flush the queue of unread data.

Raises:
RuntimeError

If a callback function is present.

get(flush=True)

Get the most recently seen value, or None if no data ever seen.

Parameters:
flush : bool (optional)

Flush the queue? Defaults to True for backwards compatibility. This only affects the next value returned by next and is ignored if there is a callback function.

Returns:
data : self.DataType or None

Return self.data if data has been read, else None.

get_oldest()

Pop and return the oldest value from the queue, or None if the queue is empty.

Returns:
data : self.DataType or None

The oldest value found on the queue, if any, else None.

Raises:
RuntimeError

If a callback function is present.

Notes

Use with caution when mixing with next, since that also consumes data from the queue.

next(*, flush, timeout=None)

Wait for a value, possibly returning the oldest queued value.

Parameters:
flush : bool

If True then flush the queue before starting a read. If False then pop and return the oldest value from the queue, if any, else wait for new data.

timeout : float (optional)

Time limit, in seconds. If None then no time limit.

Returns:
data : DataType

The data.

Raises:
RuntimeError

If a callback function is present.

Notes

Do not modify the returned data. To make a copy that you can modify use copy.copy(value).