SalInfo#

class lsst.ts.salobj.SalInfo(domain, name, index=0, write_only=False, num_messages=1, consume_messages_timeout=0.1)#

Bases: object

Information for one SAL component and index.

Parameters:
  • domain (Domain) – Domain information.

  • name (str) – SAL component name.

  • index (int, optional) – Component index; 0 or None if this component is not indexed.

  • write_only (bool) – If False this SalInfo will not subscribe to any topics.

  • num_messages (int) – Number of messages to consume in the read loop.

  • consume_messages_timeout (float) – Timeout to wait for new messages to arrive in the read loop.

Raises:
domain#

The domain constructor argument.

Type:

Domain

index#

The index constructor argument.

Type:

int

num_messages#

Number of messages to consume in the read loop.

Type:

int

consume_messages_timeout#

Timeout to wait for new messages to arrive in the read loop.

Type:

float

identity#

Value used for the private_identity field of DDS messages. Defaults to username@host, but CSCs should use the CSC name: * SAL_component_name for a non-indexed SAL component * SAL_component_name:index for an indexed SAL component.

Type:

str

isopen#

Is this read topic open? True until close is called.

Type:

bool

log#

A logger.

Type:

logging.Logger

start_called#

Has the start method been called? This instance is fully started when start_task is done.

Type:

bool

done_task#

A task which is finished when close is done.

Type:

asyncio.Task

start_task#

A task which is finished when start is done, or to an exception if start fails.

Type:

asyncio.Task

command_names#

A tuple of command names without the "command_" prefix.

Type:

List [str]

event_names#

A tuple of event names, without the "logevent_" prefix

Type:

List [str]

telemetry_names#

A tuple of telemetry topic names.

Type:

List [str]

sal_topic_names#

A tuple of SAL topic names, e.g. “logevent_summaryState”, in alphabetical order. This is needed to determine command ID.

Type:

List [str]

component_info#

Information about the SAL component and its topics.

Type:

ComponentInfo

Notes

Reads the following Environment Variables; follow the link for details:

  • LSST_TOPIC_SUBNAME (required): a component of Kafka topic names and schema namespaces.

  • LSST_DDS_ENABLE_AUTHLIST (optional): if set to “1” enable authlist-based command authorization. If “0” or undefined, do not enable authorization.

  • LSST_KAFKA_BROKER_ADDR (optional): address of Kafka broker. Defaults to broker:29092 (matching the value in file docker-compose.yaml), for unit tests.

  • LSST_SCHEMA_REGISTRY_URL (optional): url of the Confluent schema registry. Defaults to http://schema-registry:8081 (matching the value in file docker-compose.yaml), for unit tests.

  • LSST_KAFKA_SECURITY_USERNAME (optional): Username to authenticate with the kafka broker.

  • LSST_KAFKA_SECURITY_PASSWORD (optional): Password to authenticate with the kafka broker.

  • LSST_KAFKA_SECURITY_PROTOCOL (optional): Authentication protocol to use with the kafka broker.

  • LSST_KAFKA_SECURITY_MECHANISM (optional): Authentication mechanism to use with the kafka broker.

  • LSST_KAFKA_REPLICATION_FACTOR (optional): Replication factor to use when creating topics.

  • LSST_KAFKA_PRODUCER_WAIT_ACKS (optional): The number of acknowledgments the producer requires the leader to have received before considering a request complete.

Usage

  • Construct a SalInfo object for a particular SAL component and index.

  • Use the object to construct all topics (subclasses of topics.BaseTopic) that you want to use with this SAL component and index.

  • Call start.

  • When you are finished, call close, or at least be sure to close the domain when you are finished with all classes that use it (see Cleanup below).

You cannot read or write topics constructed with a SalInfo object until you call start, and once you call start, you cannot use the SalInfo object to construct any more topics.

You may use SalInfo as an async context manager, but this is primarily useful for cleanup. After you enter the context (create the object) you will still have to create topics and call start. This is different from Domain, Controller, and Remote, which are ready to use when you enter the context.

Cleanup

Each SalInfo automatically registers itself with the specified domain for cleanup, using a weak reference to avoid circular dependencies. You may safely close a SalInfo before closing its domain, and this is recommended if you create and destroy many remotes. In any case, be sure to close the domain when you are done.

Attributes Summary

AckCmdType

The class of command acknowledgement.

indexed

Is this SAL component indexed?.

name

Get the SAL component name (the name constructor argument).

name_index

index].

running

Return True if started and not closed.

started

Return True if successfully started, False otherwise.

Methods Summary

add_reader(topic)

Add a ReadTopic, so it can be read by the read loop and closed by close.

add_writer(topic)

Add a WriteTopic, so it can be closed by close.

assert_running()

Raise RuntimeError if not running.

assert_started()

Raise RuntimeError if not successfully started.

basic_close()

A synchronous and less thorough version of close.

close([cancel_run_kafka_task])

Shut down and clean up resources.

flush_loop()

Constantly call flush to force data to be delivered.

get_broker_client_configuration()

Get the broker client configuration.

get_max_history_for_indexed_component()

Get the max history size for an indexed component.

make_ackcmd(private_seqNum, ack[, error, ...])

Make an AckCmdType object from keyword arguments.

start()

Start the read loop.

write_data(topic_info, data_dict)

Write a message.

Attributes Documentation

AckCmdType#

The class of command acknowledgement.

It includes these fields, as well as the usual other private fields.

private_seqNumint

Sequence number of command.

ackint

Acknowledgement code; one of the SalRetCode CMD_ constants, such as SalRetCode.CMD_COMPLETE.

errorint

Error code; 0 for no error.

resultstr

Explanatory message, or “” for no message.

Raises:

RuntimeError – If the SAL component has no commands (because if there are no commands then there is no ackcmd topic).

indexed#

Is this SAL component indexed?.

name#

Get the SAL component name (the name constructor argument).

name_index#

index].

The suffix is only present if the component is indexed.

Type:

Get name[

running#

Return True if started and not closed.

started#

Return True if successfully started, False otherwise.

Methods Documentation

add_reader(topic)#

Add a ReadTopic, so it can be read by the read loop and closed by close.

Parameters:

topic (topics.ReadTopic) – Topic to read and (eventually) close.

Raises:

RuntimeError – If called after start has been called.

Return type:

None

add_writer(topic)#

Add a WriteTopic, so it can be closed by close.

Parameters:

topic (topics.WriteTopic) – Write topic to (eventually) close.

Return type:

None

assert_running()#

Raise RuntimeError if not running.

Return type:

None

assert_started()#

Raise RuntimeError if not successfully started.

Return type:

None

Notes

Does not raise after this is closed. That avoids race conditions at shutdown.

basic_close()#

A synchronous and less thorough version of close.

Intended for exit handlers and constructor error handlers.

Return type:

None

async close(cancel_run_kafka_task=True)#

Shut down and clean up resources.

May be called multiple times. The first call closes the SalInfo; subsequent calls wait until the SalInfo is closed.

Parameters:

cancel_run_kafka_task (bool, default: True)

Return type:

None

async flush_loop()#

Constantly call flush to force data to be delivered.

Return type:

None

get_broker_client_configuration()#

Get the broker client configuration.

Returns:

Broker configuration.

Return type:

dict`[`str, str]

get_max_history_for_indexed_component()#

Get the max history size for an indexed component.

Returns:

max_history – The maximum number of historic messages to load.

Return type:

int

Notes

The default value is MAX_HISTORY_READ.

If the LSST_MAX_HISTORY_READ env var is defined, that is used.

If the LSST_HISTORY_READ_CONFIG env var file name is defined and the file exists and the component name is in the file, that value is used.

make_ackcmd(private_seqNum, ack, error=0, result='', timeout=0)#

Make an AckCmdType object from keyword arguments.

Parameters:
  • private_seqNum (int) – Sequence number of command.

  • ack (int) – Acknowledgement code; one of the salobj.SalRetCode.CMD_ constants, such as salobj.SalRetCode.CMD_COMPLETE.

  • error (int) – Error code. Should be 0 unless ack is salobj.SalRetCode.CMD_FAILED

  • result (str) – More information.

  • timeout (float) – Esimated command duration. This should be specified if ack is salobj.SalRetCode.CMD_INPROGRESS.

Raises:

RuntimeError – If the SAL component has no commands (because if there are no commands then there is no ackcmd topic).

Return type:

AckCmdDataType

async start()#

Start the read loop.

Call this after all topics have been added.

Raises:

RuntimeError – If start or close have already been called.

Return type:

None

async write_data(topic_info, data_dict)#

Write a message.

Parameters:
  • topic_info (TopicInfo) – Info for the topic.

  • data_dict (dict[str, Any]) – Message to write, as a dict that matches the Avro topic schema.

Return type:

None