ReadTopic¶
-
class
lsst.ts.salobj.topics.
ReadTopic
(*, salinfo, name, sal_prefix, max_history, queue_len=100, filter_ackcmd=True)¶ Bases:
lsst.ts.salobj.topics.BaseTopic
Base class for reading a topic.
Parameters: - salinfo :
SalInfo
SAL component information
- name :
str
Topic name, without a “command_” or “logevent_” prefix.
- sal_prefix :
str
SAL topic prefix: one of “command_”, “logevent_” or “”
- max_history :
int
Maximum number of historical items to read:
- 0 is required for commands, events, and the ackcmd topic
- 1 is recommended for telemetry
- queue_len :
int
, optional The maximum number of messages that can be read and not dealt with by a callback function or
next
before older messages will be dropped.- filter_ackcmd :
bool
, optional Filter out cmdack topics so we only see responses to commands that we sent? This is normally what you want, but it is not wanted for SAL/Kafka producers. Ignored if
name
!= “ackcmd”.
Raises: - ValueError
If max_history < 0.
- ValueError
If max_history > 0 and the topic is volatile (command or ackcmd).
- ValueError
If queue_len < MIN_QUEUE_LEN.
- ValueError
If max_history > queue_len.
- UserWarning
If max_history > DDS history queue depth or DDS durability service history depth for this topic. This is a warning rather than an exception, so that the DDS quality of service can be changed without breaking existing code.
Notes
Queues
There are two queues: a Python queue whose length is set by
queue_len
and a dds queue whose length is set by the DDS Quality of Service file. (The Python queue is needed because of limitations in the API for the OpenSplice DDS queue, including no access to the most recent message, no ability to ask how many messages are on the queue, and no asyncio support). In the doc strings for the methods, below, any reference to the queue refers to the Python queue.Data can be lost from either queue:
- If this class cannot read messages from the DDS queue fast enough, then older messages will be dropped from the DDS queue. You will get several warning log messages as the DDS queue fills.
- As messages are read from the DDS queue they are put on the Python queue.
If a callback function or
next
does not process data quickly enough then older messages are dropped from the Python queue. If you have a callback function then you will get several warning log messages as the Python queue fills up; you get no warning otherwise becauseReadTopic
has no way of knowing whether or not you intend to read all messages.
Reading
Reading is performed by the contained
SalInfo
, which has single read loop that reads messages for all topics. This is more efficient than having eachReadTopic
read its own messages.Modifying Messages
All functions that return messages return them from some form of internal cache. This presents a risk: if any reader modifies a message, then it will be modified for all readers of that message. To safely modify a returned message, make your own copy with
copy.copy(data)
.Attributes: - isopen :
bool
- dds_queue_length_checker :
QueueCapacityChecker
Queue length checker for the DDS queue.
- python_queue_length_checker :
QueueCapacityChecker
: Queue length checker for the Python queue.
Attributes Summary
DataType
The type (class) for a message of this topic. allow_multiple_callbacks
Can callbacks can run simultaneously? callback
Callback function, or None if there is not one. has_callback
Return True if there is a callback function. has_data
Has any data ever been seen for this topic? max_history
metadata
Get topic metadata as a TopicMetadata
, if available, elseNone
.nqueued
Return the number of messages in the Python queue. volatile
Does this topic have volatile durability? Methods Summary
aget
([timeout])Get the most recent message, or wait for data if no data has ever been seen ( has_data
False).basic_close
()A synchronous and possibly less thorough version of close
.close
()Shut down and release resources. flush
()Flush the queue used by get_oldest
andnext
.get
([flush])Get the most recent message, or None
if no data has ever been seen (has_data
False).get_oldest
()Pop and return the oldest message from the queue, or None
if the queue is empty.next
(*, flush[, timeout])Pop and return the oldest message from the queue, waiting for data if the queue is empty. Attributes Documentation
-
DataType
¶ The type (class) for a message of this topic.
When you read or write a message for this topic you are reading or writing an instance of
DataType
.Notes
The preferred way to write a message for a topic is:
RemoteCommand.set_start
to start a command.CommandEvent.set_put
to write an event.CommandTelemetry.set_put
to write a telemetry message.
However, it is also possible to use
DataType
to create a message, then write, it as separate operations. For example, assuming we have aRemote
for SAL component “Test”:# The preferred way to issue a command: await = remote.cmd_wait.set_put(duration=2, timeout=5) # But an alternative is to first create the command, # then send it, as two separate operations: message = remote.cmd_wait.DataType(duration=2) await remote.cmd_wait.start(message, timeout=5) # Or, even more verbosely: message = remote.cmd_wait.DataType() message.duration = 2 await remote.cmd_wait.start(message, timeout=5)
-
allow_multiple_callbacks
¶ Can callbacks can run simultaneously?
Notes
Ignored for synchronous callbacks because those block while running. In particular, if the callback is synchronous but launches one or more background jobs then the number of those jobs cannot be limited by this class.
-
callback
¶ Callback function, or None if there is not one.
The callback function is called when a new message is received; it receives one argument: the message (an object of type
topics.BaseTopic.DataType
).Raises: - TypeError
When setting a new callback if the callback is not None and is not callable.
Notes
The callback function can be synchronous or asynchronous (e.g. defined with
async def
).Setting a callback flushes the queue, and it will remain empty as long as there is a callback.
get_oldest
andnext
are prohibited if there is a callback function. Technically they could both work, butget_oldest
would always returnNone
andnext
would miss messages if they arrived while waiting for something else. It seems safer to raise an exception.
-
has_callback
¶ Return True if there is a callback function.
-
has_data
¶ Has any data ever been seen for this topic?
Raises: - RuntimeError
If the
salinfo
has not started reading.
-
max_history
¶
-
nqueued
¶ Return the number of messages in the Python queue.
-
volatile
¶ Does this topic have volatile durability?
Methods Documentation
-
aget
(timeout=None)¶ Get the most recent message, or wait for data if no data has ever been seen (
has_data
False).This method does not change which message will be returned by any other method (except for the fact that new data will arrive while waiting).
Parameters: - timeout :
float
, optional Time limit, in seconds. If None then no time limit.
Returns: - data :
DataType
The current or next message.
Raises: - RuntimeError
If a callback function is present, or if the
salinfo
has not started reading.
Notes
Do not modify the returned data. To make a copy that you can safely modify, use
copy.copy(data)
.- timeout :
-
basic_close
()¶ A synchronous and possibly less thorough version of
close
.Intended for exit handlers and constructor error handlers.
-
close
()¶ Shut down and release resources.
Intended to be called by SalInfo.close(), since that tracks all topics.
-
flush
()¶ Flush the queue used by
get_oldest
andnext
.This makes
get_oldest
returnNone
andnext
wait, until a new message arrives. It does not change which message will be returned byaget
orget
.Raises: - RuntimeError
If a callback function is present.
-
get
(flush=None)¶ Get the most recent message, or
None
if no data has ever been seen (has_data
False).This method does not change which message will be returned by
aget
. Ifflush=False
this method also does not modify which message will be returned byget_oldest
andnext
.Parameters: - flush :
bool
, optional Flush the queue? Flushing the queue is deprecated and so is specifying this argument. False (the default) leaves the cache alone, which has no effect on the messages returned by any read method. True affects which messages will be returned by
get_oldest
andnext
. True has no effect if there is a callback function. Note:None
is treated asFalse
, but please do not specify it; it only supported for now to handle deprecation warnings..
Returns: Raises: - RuntimeError
If the
salinfo
has not started reading.
- flush :
-
get_oldest
()¶ Pop and return the oldest message from the queue, or
None
if the queue is empty.This is a variant of
next
that does not wait for a new message. This method affects which message will be returned bynext
, but not which message will be returned byaget
orget
.Returns: Raises: - RuntimeError
If a callback function is present, or if the
salinfo
has not started reading.
Notes
Use with caution when mixing with
next
, since that also consumes data from the queue.
-
next
(*, flush, timeout=None)¶ Pop and return the oldest message from the queue, waiting for data if the queue is empty.
This method affects the data returned by
get_oldest
, but not the data returned byaget
orget
.Parameters: - flush :
bool
If
True
then flush the queue before starting a read. This guarantees that the method will wait for a new message. IfFalse
and there is data on the queue, then pop and return the oldest message from the queue, without waiting; if queue is empty then wait for a new message.- timeout :
float
, optional Time limit, in seconds. If None then no time limit.
Returns: - data :
DataType
The message data.
Raises: - RuntimeError
If a callback function is present, or if the
salinfo
has not started reading.
Notes
Do not modify the returned data. To make a copy that you can safely modify, use
copy.copy(data)
.- flush :
- salinfo :