ReadTopic#

class lsst.ts.salobj.topics.ReadTopic(*, salinfo, attr_name, max_history, queue_len=100)#

Bases: BaseTopic

Base class for reading a topic.

Parameters:
  • salinfo (SalInfo) – SAL component information

  • attr_name (str) – Topic name with attribute prefix. The prefix must be one of: cmd_, evt_, tel_, or (only for the ackcmd topic) ack_.

  • max_history (int) –

    Maximum number of historical items to read:

    • 0 is required for commands, events, and the ackcmd topic.

    • 1 is recommended for telemetry. For an indexed component it is possible for data from one index to push data for another index off the DDS queue, so historical data is not guaranteed.

    • For the special case of reading an indexed SAL component with index=0 (read all indices) the only allowed values are 0 or 1. If 1 then retrieve the most recent sample for each index that is still in the read queue, in the order received. max_history > 1 is forbidden, because it is difficult to implement.

  • queue_len (int, optional) – The maximum number of messages that can be read and not dealt with by a callback function or next before older messages will be dropped.

Raises:
  • ValueError – If max_history < 0.

  • ValueError – If queue_len < MIN_QUEUE_LEN.

  • ValueError – If max_history > queue_len.

  • ValueError – If for an indexed component if index=0 and max_history > 1. Reading more than one historical sample per index is more trouble than it is worth.

  • UserWarning – If max_history > DDS history queue depth or DDS durability service history depth for this topic. This is a warning rather than an exception, so that the DDS quality of service can be changed without breaking existing code.

isopen#

Is this read topic open? True until close or basic_close is called.

Type:

bool

python_queue_length_checker#

Queue length checker for the Python queue.

Type:

QueueCapacityChecker:

Notes

There is a queue for data whose length is set by queue_len. Data can be lost from this queue if a callback function or next does not process data quickly enough then older messages are dropped from the Python queue. If you have a callback function then you will get several warning log messages as the Python queue fills up; you get no warning otherwise because ReadTopic has no way of knowing whether or not you intend to read all messages.

Reading

Reading is performed by the contained SalInfo, which has single read loop that reads messages for all topics. This is more efficient than having each ReadTopic read its own messages.

Modifying Messages

All functions that return messages return them from some form of internal cache. This presents a risk: if any reader modifies a message, then it will be modified for all readers of that message. To safely modify a returned message, make your own copy with copy.copy(data).

Attributes Summary

allow_multiple_callbacks

Can callbacks can run simultaneously?

callback

Asynchronous callback function, or None if there is not one.

has_callback

Return True if there is a callback function.

has_data

Has any data ever been seen for this topic?

max_history

nqueued

Return the number of messages in the Python queue.

Methods Summary

aget([timeout])

Get the most recent message, or wait for data if no data has ever been seen (has_data False).

basic_close()

A synchronous and possibly less thorough version of close.

close()

Shut down and release resources.

flush()

Flush the queue used by get_oldest and next.

get()

Get the most recent message, or None if no data has ever been seen (has_data False).

get_oldest()

Pop and return the oldest message from the queue, or None if the queue is empty.

next(*, flush[, timeout])

Pop and return the oldest message from the queue, waiting for data if the queue is empty.

Attributes Documentation

allow_multiple_callbacks#

Can callbacks can run simultaneously?

Notes

Ignored for synchronous callbacks because those block while running. In particular, if the callback is synchronous but launches one or more background jobs then the number of those jobs cannot be limited by this class.

callback#

Asynchronous callback function, or None if there is not one.

Synchronous callback functions are deprecated.

The callback function is called when a new message is received; it receives one argument: the message (an object of type topics.BaseTopic.DataType).

Raises:

TypeError – When setting a new callback if the callback is not None and is not callable.

Notes

Setting a callback flushes the queue, and it will remain empty as long as there is a callback.

get_oldest and next are prohibited if there is a callback function. Technically they could both work, but get_oldest would always return None and next would miss messages if they arrived while waiting for something else. It seems safer to raise an exception.

has_callback#

Return True if there is a callback function.

has_data#

Has any data ever been seen for this topic?

Raises:

RuntimeError – If the salinfo has not started reading.

max_history#
nqueued#

Return the number of messages in the Python queue.

Methods Documentation

async aget(timeout=None)#

Get the most recent message, or wait for data if no data has ever been seen (has_data False).

This method does not change which message will be returned by any other method (except for the fact that new data will arrive while waiting).

Parameters:

timeout (float, optional) – Time limit, in seconds. If None then no time limit.

Returns:

data – The current or next message.

Return type:

DataType

Raises:
  • asyncio.TimeoutError – If no message is available within the specified time limit.

  • RuntimeError – If a callback function is present, or if the salinfo has not started reading.

Notes

Do not modify the returned data. To make a copy that you can safely modify, use copy.copy(data).

basic_close()#

A synchronous and possibly less thorough version of close.

Intended for exit handlers and constructor error handlers.

Return type:

None

async close()#

Shut down and release resources.

Intended to be called by SalInfo.close(), since that tracks all topics.

Return type:

None

flush()#

Flush the queue used by get_oldest and next.

This makes get_oldest return None and next wait, until a new message arrives. It does not change which message will be returned by aget or get.

Raises:

RuntimeError – If a callback function is present.

Return type:

None

get()#

Get the most recent message, or None if no data has ever been seen (has_data False).

This method does not change which message will be returned by aget, get_oldest, and next.

Returns:

data – Return self.data if data has been read, else None.

Return type:

self.DataType or None

Raises:

RuntimeError – If the salinfo has not started reading.

get_oldest()#

Pop and return the oldest message from the queue, or None if the queue is empty.

This is a variant of next that does not wait for a new message. This method affects which message will be returned by next, but not which message will be returned by aget or get.

Returns:

data – The oldest message found on the queue, if any, else None.

Return type:

self.DataType or None

Raises:

RuntimeError – If a callback function is present, or if the salinfo has not started reading.

Notes

Use with caution when mixing with next, since that also consumes data from the queue.

async next(*, flush, timeout=None)#

Pop and return the oldest message from the queue, waiting for data if the queue is empty.

This method affects the data returned by get_oldest, but not the data returned by aget or get.

Parameters:
  • flush (bool) – If True then flush the queue before starting a read. This guarantees that the method will wait for a new message. If False and there is data on the queue, then pop and return the oldest message from the queue, without waiting; if queue is empty then wait for a new message.

  • timeout (float, optional) – Time limit, in seconds. If None then no time limit.

Returns:

data – The message data.

Return type:

DataType

Raises:
  • asyncio.TimeoutError – If no message is available within the specified time limit.

  • RuntimeError – If a callback function is present, or if the salinfo has not started reading.

Notes

Do not modify the returned data. To make a copy that you can safely modify, use copy.copy(data).