ReadTopic¶
- class lsst.ts.salobj.topics.ReadTopic(*, salinfo: SalInfo, attr_name: str, max_history: int, queue_len: int = 100)¶
Bases:
BaseTopicBase class for reading a topic.
- Parameters:
- salinfo
SalInfo SAL component information
- attr_name
str Topic name with attribute prefix. The prefix must be one of:
cmd_,evt_,tel_, or (only for the ackcmd topic)ack_.- max_history
int Maximum number of historical items to read:
0 is required for commands, events, and the ackcmd topic.
1 is recommended for telemetry. For an indexed component it is possible for data from one index to push data for another index off the DDS queue, so historical data is not guaranteed.
For the special case of reading an indexed SAL component with index=0 (read all indices) the only allowed values are 0 or 1. If 1 then retrieve the most recent sample for each index that is still in the read queue, in the order received. max_history > 1 is forbidden, because it is difficult to implement.
- queue_len
int, optional The maximum number of messages that can be read and not dealt with by a callback function or
nextbefore older messages will be dropped.
- salinfo
- Attributes:
- isopen
bool Is this read topic open?
Trueuntilcloseorbasic_closeis called.- python_queue_length_checker
QueueCapacityChecker: Queue length checker for the Python queue.
- isopen
- Raises:
- ValueError
If max_history < 0.
- ValueError
If queue_len < MIN_QUEUE_LEN.
- ValueError
If max_history > queue_len.
- ValueError
If for an indexed component if index=0 and max_history > 1. Reading more than one historical sample per index is more trouble than it is worth.
- UserWarning
If max_history > DDS history queue depth or DDS durability service history depth for this topic. This is a warning rather than an exception, so that the DDS quality of service can be changed without breaking existing code.
Notes
There is a queue for data whose length is set by
queue_len. Data can be lost from this queue if a callback function ornextdoes not process data quickly enough then older messages are dropped from the Python queue. If you have a callback function then you will get several warning log messages as the Python queue fills up; you get no warning otherwise becauseReadTopichas no way of knowing whether or not you intend to read all messages.Reading
Reading is performed by the contained
SalInfo, which has single read loop that reads messages for all topics. This is more efficient than having eachReadTopicread its own messages.Modifying Messages
All functions that return messages return them from some form of internal cache. This presents a risk: if any reader modifies a message, then it will be modified for all readers of that message. To safely modify a returned message, make your own copy with
copy.copy(data).Attributes Summary
The type (class) for a message of this topic.
Can callbacks can run simultaneously?
Get the salobj topic attribute name, e.g.
Asynchronous callback function, or None if there is not one.
Return True if there is a callback function.
Has any data ever been seen for this topic?
Return the number of messages in the Python queue.
Get the SAL topic name, e.g.
Methods Summary
aget([timeout])Get the most recent message, or wait for data if no data has ever been seen (
has_dataFalse).A synchronous and possibly less thorough version of
close.close()Shut down and release resources.
flush()Flush the queue used by
get_oldestandnext.get()Get the most recent message, or
Noneif no data has ever been seen (has_dataFalse).Pop and return the oldest message from the queue, or
Noneif the queue is empty.next(*, flush[, timeout])Pop and return the oldest message from the queue, waiting for data if the queue is empty.
Attributes Documentation
- DataType¶
The type (class) for a message of this topic.
When you read or write a message for this topic you are reading or writing an instance of
DataType.Notes
The preferred way to set data and write a message for a topic is:
RemoteCommand.set_startto start a command.CommandEvent.set_writeto write an event.CommandTelemetry.set_writeto write a telemetry message.
- allow_multiple_callbacks¶
Can callbacks can run simultaneously?
Notes
Ignored for synchronous callbacks because those block while running. In particular, if the callback is synchronous but launches one or more background jobs then the number of those jobs cannot be limited by this class.
- attr_name¶
Get the salobj topic attribute name, e.g. evt_summaryState.
- callback¶
Asynchronous callback function, or None if there is not one.
Synchronous callback functions are deprecated.
The callback function is called when a new message is received; it receives one argument: the message (an object of type
topics.BaseTopic.DataType).- Raises:
- TypeError
When setting a new callback if the callback is not None and is not callable.
Notes
Setting a callback flushes the queue, and it will remain empty as long as there is a callback.
get_oldestandnextare prohibited if there is a callback function. Technically they could both work, butget_oldestwould always returnNoneandnextwould miss messages if they arrived while waiting for something else. It seems safer to raise an exception.
- has_callback¶
Return True if there is a callback function.
- has_data¶
Has any data ever been seen for this topic?
- Raises:
- RuntimeError
If the
salinfohas not started reading.
- max_history¶
- nqueued¶
Return the number of messages in the Python queue.
- sal_name¶
Get the SAL topic name, e.g. logevent_summaryState.
Methods Documentation
- async aget(timeout: float | None = None) BaseMsgType¶
Get the most recent message, or wait for data if no data has ever been seen (
has_dataFalse).This method does not change which message will be returned by any other method (except for the fact that new data will arrive while waiting).
- Parameters:
- timeout
float, optional Time limit, in seconds. If None then no time limit.
- timeout
- Returns:
- data
DataType The current or next message.
- data
- Raises:
- asyncio.TimeoutError
If no message is available within the specified time limit.
- RuntimeError
If a callback function is present, or if the
salinfohas not started reading.
Notes
Do not modify the returned data. To make a copy that you can safely modify, use
copy.copy(data).
- basic_close() None¶
A synchronous and possibly less thorough version of
close.Intended for exit handlers and constructor error handlers.
- async close() None¶
Shut down and release resources.
Intended to be called by SalInfo.close(), since that tracks all topics.
- flush() None¶
Flush the queue used by
get_oldestandnext.This makes
get_oldestreturnNoneandnextwait, until a new message arrives. It does not change which message will be returned byagetorget.- Raises:
- RuntimeError
If a callback function is present.
- get() BaseMsgType | None¶
Get the most recent message, or
Noneif no data has ever been seen (has_dataFalse).This method does not change which message will be returned by
aget,get_oldest, andnext.
- get_oldest() BaseMsgType | None¶
Pop and return the oldest message from the queue, or
Noneif the queue is empty.This is a variant of
nextthat does not wait for a new message. This method affects which message will be returned bynext, but not which message will be returned byagetorget.- Returns:
- Raises:
- RuntimeError
If a callback function is present, or if the
salinfohas not started reading.
Notes
Use with caution when mixing with
next, since that also consumes data from the queue.
- async next(*, flush: bool, timeout: float | None = None) BaseMsgType¶
Pop and return the oldest message from the queue, waiting for data if the queue is empty.
This method affects the data returned by
get_oldest, but not the data returned byagetorget.- Parameters:
- flush
bool If
Truethen flush the queue before starting a read. This guarantees that the method will wait for a new message. IfFalseand there is data on the queue, then pop and return the oldest message from the queue, without waiting; if queue is empty then wait for a new message.- timeout
float, optional Time limit, in seconds. If None then no time limit.
- flush
- Returns:
- data
DataType The message data.
- data
- Raises:
- asyncio.TimeoutError
If no message is available within the specified time limit.
- RuntimeError
If a callback function is present, or if the
salinfohas not started reading.
Notes
Do not modify the returned data. To make a copy that you can safely modify, use
copy.copy(data).