ReadTopic¶
-
class
lsst.ts.salobj.topics.ReadTopic(*, salinfo, name, sal_prefix, max_history, queue_len=100, filter_ackcmd=True)¶ Bases:
lsst.ts.salobj.topics.BaseTopicBase class for reading a topic.
Parameters: - salinfo :
SalInfo SAL component information
- name :
str Topic name, without a “command_” or “logevent_” prefix.
- sal_prefix :
str SAL topic prefix: one of “command_”, “logevent_” or “”
- max_history :
int Maximum number of historical items to read:
- 0 is required for commands and the ackcmd reader
- 1 is recommended for events and telemetry
- queue_len :
int, optional The maximum number of messages that can be read and not dealt with by a callback function or
nextbefore older messages will be dropped.- filter_ackcmd :
bool, optional Filter out cmdack topics so we only see responses to commands that we sent? This is normally what you want, but it is not wanted for SAL/Kafka producers. Ignored if
name!= “ackcmd”.
Raises: - ValueError
If max_history < 0.
- ValueError
If max_history > 0 and the topic is volatile (command or ackcmd).
- ValueError
If queue_len <= 0.
- ValueError
If max_history > queue_len.
Notes
Queues
There are actually two queues: an internal queue whose length is set by
queue_lenand a dds queue whose length is set by low level configuration. Data can be lost in two ways:- If this class cannot read messages from the dds queue fast enough, then older messages will be dropped from the dds queue. You will get a warning log message if the reader starts to fall behind.
- As messages are read they are put on the internal queue.
If a callback function or
nextdoes not process data quickly enough then older messages are dropped from the internal queue. If you have a callback function then you will get several warning log messages as this internal queue fills up. You get no warning otherwise because this class has no way of knowing whether or not you intend to read all messages.
Reading
Reading is performed by the
SalInfowhich has single read loop that reads messages for all topics. This is more efficient than having eachReadTopicread its own messages.Modifying Messages
All functions that return messages return from an internal cache. This cached value is replaced by all read operations. So as long as no other code modifies a message, the returned message will not change.
To safely modify a returned message, make your own copy with
copy.copy(data).Attributes: - isopen :
bool - dds_queue_length_checker :
QueueCapacityChecker Queue length checker for the DDS queue.
- python_queue_length_checker :
QueueCapacityChecker: Queue length checker for the Python queue.
Attributes Summary
DataTypeThe type (class) for a message of this topic. allow_multiple_callbacksCan callbacks can run simultaneously? callbackCallback function, or None if there is not one. has_callbackReturn True if there is a callback function. has_dataHas any data been seen for this topic? max_historymetadataGet topic metadata as a TopicMetadata, if available, elseNone.Methods Summary
aget([timeout])Get the current message, if any, else wait for the next message. close()Shut down and release resources. flush()Flush the queue of unread data. get([flush])Get the most recently seen message, or Noneif no data ever seen.get_oldest()Pop and return the oldest message from the queue, or Noneif the queue is empty.next(*, flush[, timeout])Wait for a message, possibly returning the oldest queued message. Attributes Documentation
-
DataType¶ The type (class) for a message of this topic.
When you read or write a message for this topic you are reading or writing an instance of
DataType.Notes
The preferred way to write a message for a topic is:
RemoteCommand.set_startto start a command.CommandEvent.set_putto write an event.CommandTelemetry.set_putto write a telemetry message.
However, it is also possible to use
DataTypeto create a message, then write, it as separate operations. For example, assuming we have aRemotefor SAL component “Test”:# The preferred way to issue a command: await = remote.cmd_wait.set_put(duration=2, timeout=5) # But an alternative is to first create the command, # then send it, as two separate operations: message = remote.cmd_wait.DataType(duration=2) await remote.cmd_wait.start(message, timeout=5) # Or, even more verbosely: message = remote.cmd_wait.DataType() message.duration = 2 await remote.cmd_wait.start(message, timeout=5)
-
allow_multiple_callbacks¶ Can callbacks can run simultaneously?
Notes
Ignored for synchronous callbacks because those block while running. In particular, if the callback is synchronous but launches one or more background jobs then the number of those jobs cannot be limited by this class.
-
callback¶ Callback function, or None if there is not one.
The callback function is called when a new message is received; it receives one argument: the message (an object of type
topics.BaseTopic.DataType).Raises: - TypeError
When setting a new callback if the callback is not None and is not callable.
Notes
The callback function can be synchronous or asynchronous (e.g. defined with
async def).Setting a callback flushes the queue, and it will remain empty as long as there is a callback.
get_oldestandnextare prohibited if there is a callback function. Technically they could both work, butget_oldestwould always returnNoneandnextwould miss messages if they arrived while waiting for something else. It seemed safer to just raise an exception.
-
has_callback¶ Return True if there is a callback function.
-
has_data¶ Has any data been seen for this topic?
Raises: - RuntimeError
If the
salinfohas not started reading.
-
max_history¶
Methods Documentation
-
aget(timeout=None)¶ Get the current message, if any, else wait for the next message.
Parameters: - timeout :
float, optional Time limit, in seconds. If None then no time limit.
Returns: - data :
DataType The current or next message.
Raises: - RuntimeError
If a callback function is present, or if the
salinfohas not started reading.
Notes
Do not modify the returned data. To make a copy that you can safely modify, use
copy.copy(data).- timeout :
-
close()¶ Shut down and release resources.
Intended to be called by SalInfo.close(), since that tracks all topics.
-
flush()¶ Flush the queue of unread data.
Raises: - RuntimeError
If a callback function is present.
-
get(flush=True)¶ Get the most recently seen message, or
Noneif no data ever seen.Parameters: Returns: Raises: - RuntimeError
If the
salinfohas not started reading.
-
get_oldest()¶ Pop and return the oldest message from the queue, or
Noneif the queue is empty.Returns: Raises: - RuntimeError
If a callback function is present, or if the
salinfohas not started reading.
Notes
Use with caution when mixing with
next, since that also consumes data from the queue.
-
next(*, flush, timeout=None)¶ Wait for a message, possibly returning the oldest queued message.
Parameters: Returns: - data :
DataType The message data.
Raises: - RuntimeError
If a callback function is present, or if the
salinfohas not started reading.
Notes
Do not modify the returned data. To make a copy that you can safely modify, use
copy.copy(data).- data :
- salinfo :