SalInfo

class lsst.ts.salobj.SalInfo(domain: Domain, name: str, index: int | None = 0, write_only: bool = False, num_messages: int = 1, consume_messages_timeout: float = 0.1)

Bases: object

Information for one SAL component and index.

Parameters:
domainDomain

Domain information.

namestr

SAL component name.

indexint, optional

Component index; 0 or None if this component is not indexed.

write_onlybool

If False this SalInfo will not subscribe to any topics.

num_messagesint

Number of messages to consume in the read loop.

consume_messages_timeoutfloat

Timeout to wait for new messages to arrive in the read loop.

Attributes:
domainDomain

The domain constructor argument.

indexint

The index constructor argument.

num_messagesint

Number of messages to consume in the read loop.

consume_messages_timeoutfloat

Timeout to wait for new messages to arrive in the read loop.

identitystr

Value used for the private_identity field of DDS messages. Defaults to username@host, but CSCs should use the CSC name: * SAL_component_name for a non-indexed SAL component * SAL_component_name:index for an indexed SAL component.

isopenbool

Is this read topic open? True until close is called.

loglogging.Logger

A logger.

start_calledbool

Has the start method been called? This instance is fully started when start_task is done.

done_taskasyncio.Task

A task which is finished when close is done.

start_taskasyncio.Task

A task which is finished when start is done, or to an exception if start fails.

command_namesList [str]

A tuple of command names without the "command_" prefix.

event_namesList [str]

A tuple of event names, without the "logevent_" prefix

telemetry_namesList [str]

A tuple of telemetry topic names.

sal_topic_namesList [str]

A tuple of SAL topic names, e.g. “logevent_summaryState”, in alphabetical order. This is needed to determine command ID.

component_infoComponentInfo

Information about the SAL component and its topics.

Raises:
TypeError

If domain is not an instance of Domain or if index is not an int, enum.IntEnum, or None.

ValueError

If index is nonzero and the component is not indexed.

Notes

Reads the following Environment Variables; follow the link for details:

  • LSST_TOPIC_SUBNAME (required): a component of Kafka topic names and schema namespaces.

  • LSST_DDS_ENABLE_AUTHLIST (optional): if set to “1” enable authlist-based command authorization. If “0” or undefined, do not enable authorization.

  • LSST_KAFKA_BROKER_ADDR (optional): address of Kafka broker. Defaults to broker:29092 (matching the value in file docker-compose.yaml), for unit tests.

  • LSST_SCHEMA_REGISTRY_URL (optional): url of the Confluent schema registry. Defaults to http://schema-registry:8081 (matching the value in file docker-compose.yaml), for unit tests.

  • LSST_KAFKA_SECURITY_USERNAME (optional): Username to authenticate with the kafka broker.

  • LSST_KAFKA_SECURITY_PASSWORD (optional): Password to authenticate with the kafka broker.

  • LSST_KAFKA_SECURITY_PROTOCOL (optional): Authentication protocol to use with the kafka broker.

  • LSST_KAFKA_SECURITY_MECHANISM (optional): Authentication mechanism to use with the kafka broker.

  • LSST_KAFKA_REPLICATION_FACTOR (optional): Replication factor to use when creating topics.

  • LSST_KAFKA_PRODUCER_WAIT_ACKS (optional): The number of acknowledgments the producer requires the leader to have received before considering a request complete.

Usage

  • Construct a SalInfo object for a particular SAL component and index.

  • Use the object to construct all topics (subclasses of topics.BaseTopic) that you want to use with this SAL component and index.

  • Call start.

  • When you are finished, call close, or at least be sure to close the domain when you are finished with all classes that use it (see Cleanup below).

You cannot read or write topics constructed with a SalInfo object until you call start, and once you call start, you cannot use the SalInfo object to construct any more topics.

You may use SalInfo as an async context manager, but this is primarily useful for cleanup. After you enter the context (create the object) you will still have to create topics and call start. This is different from Domain, Controller, and Remote, which are ready to use when you enter the context.

Cleanup

Each SalInfo automatically registers itself with the specified domain for cleanup, using a weak reference to avoid circular dependencies. You may safely close a SalInfo before closing its domain, and this is recommended if you create and destroy many remotes. In any case, be sure to close the domain when you are done.

Attributes Summary

AckCmdType

The class of command acknowledgement.

indexed

Is this SAL component indexed?.

name

Get the SAL component name (the name constructor argument).

name_index

Get name[:index].

running

Return True if started and not closed.

started

Return True if successfully started, False otherwise.

Methods Summary

add_reader(topic)

Add a ReadTopic, so it can be read by the read loop and closed by close.

add_writer(topic)

Add a WriteTopic, so it can be closed by close.

assert_running()

Raise RuntimeError if not running.

assert_started()

Raise RuntimeError if not successfully started.

basic_close()

A synchronous and less thorough version of close.

close([cancel_run_kafka_task])

Shut down and clean up resources.

flush_loop()

Constantly call flush to force data to be delivered.

get_broker_client_configuration()

Get the broker client configuration.

make_ackcmd(private_seqNum, ack[, error, ...])

Make an AckCmdType object from keyword arguments.

start()

Start the read loop.

write_data(topic_info, data_dict)

Write a message.

Attributes Documentation

AckCmdType

The class of command acknowledgement.

It includes these fields, as well as the usual other private fields.

private_seqNumint

Sequence number of command.

ackint

Acknowledgement code; one of the SalRetCode CMD_ constants, such as SalRetCode.CMD_COMPLETE.

errorint

Error code; 0 for no error.

resultstr

Explanatory message, or “” for no message.

Raises:
RuntimeError

If the SAL component has no commands (because if there are no commands then there is no ackcmd topic).

indexed

Is this SAL component indexed?.

name

Get the SAL component name (the name constructor argument).

name_index

Get name[:index].

The suffix is only present if the component is indexed.

running

Return True if started and not closed.

started

Return True if successfully started, False otherwise.

Methods Documentation

add_reader(topic: ReadTopic) None

Add a ReadTopic, so it can be read by the read loop and closed by close.

Parameters:
topictopics.ReadTopic

Topic to read and (eventually) close.

Raises:
RuntimeError

If called after start has been called.

add_writer(topic: WriteTopic) None

Add a WriteTopic, so it can be closed by close.

Parameters:
topictopics.WriteTopic

Write topic to (eventually) close.

assert_running() None

Raise RuntimeError if not running.

assert_started() None

Raise RuntimeError if not successfully started.

Notes

Does not raise after this is closed. That avoids race conditions at shutdown.

basic_close() None

A synchronous and less thorough version of close.

Intended for exit handlers and constructor error handlers.

async close(cancel_run_kafka_task: bool = True) None

Shut down and clean up resources.

May be called multiple times. The first call closes the SalInfo; subsequent calls wait until the SalInfo is closed.

async flush_loop() None

Constantly call flush to force data to be delivered.

get_broker_client_configuration() dict[str, Any]

Get the broker client configuration.

Returns:
dict`[`str, str]

Broker configuration.

make_ackcmd(private_seqNum: int, ack: SalRetCode, error: int = 0, result: str = '', timeout: float = 0) AckCmdDataType

Make an AckCmdType object from keyword arguments.

Parameters:
private_seqNumint

Sequence number of command.

ackint

Acknowledgement code; one of the salobj.SalRetCode.CMD_ constants, such as salobj.SalRetCode.CMD_COMPLETE.

errorint

Error code. Should be 0 unless ack is salobj.SalRetCode.CMD_FAILED

resultstr

More information.

timeoutfloat

Esimated command duration. This should be specified if ack is salobj.SalRetCode.CMD_INPROGRESS.

Raises:
RuntimeError

If the SAL component has no commands (because if there are no commands then there is no ackcmd topic).

async start() None

Start the read loop.

Call this after all topics have been added.

Raises:
RuntimeError

If start or close have already been called.

async write_data(topic_info: TopicInfo, data_dict: dict[str, Any]) None

Write a message.

Parameters:
topic_infoTopicInfo

Info for the topic.

data_dictdict[str, Any]

Message to write, as a dict that matches the Avro topic schema.