prefect.server.utilities.messaging.memory

Functions

log_metrics_periodically

log_metrics_periodically(interval: float = 2.0) -> None

update_metric

update_metric(topic: str, key: str, amount: int = 1) -> None

break_topic

break_topic()

ephemeral_subscription

ephemeral_subscription(topic: str) -> AsyncGenerator[Mapping[str, Any], None]

Classes

MemoryMessage

Subscription

A subscription to a topic. Messages are delivered to the subscription’s queue and retried up to a maximum number of times. If a message cannot be delivered after the maximum number of retries it is moved to the dead letter queue. The dead letter queue is a directory of JSON files containing the serialized message. Messages remain in the dead letter queue until they are removed manually. Methods:

deliver

deliver(self, message: MemoryMessage) -> None
Deliver a message to the subscription’s queue. Args:
  • message: The message to deliver.

get

get(self) -> MemoryMessage
Get a message from the subscription’s queue.

retry

retry(self, message: MemoryMessage) -> None
Place a message back on the retry queue. If the message has retried more than the maximum number of times it is moved to the dead letter queue. Args:
  • message: The message to retry.

send_to_dead_letter_queue

send_to_dead_letter_queue(self, message: MemoryMessage) -> None
Send a message to the dead letter queue. The dead letter queue is a directory of JSON files containing the serialized messages. Args:
  • message: The message to send to the dead letter queue.

Topic

Methods:

by_name

by_name(cls, name: str) -> 'Topic'

clear

clear(self) -> None

clear_all

clear_all(cls) -> None

publish

publish(self, message: MemoryMessage) -> None

subscribe

subscribe(self, **subscription_kwargs: Any) -> Subscription

unsubscribe

unsubscribe(self, subscription: Subscription) -> None

Cache

Methods:

clear_recently_seen_messages

clear_recently_seen_messages(self) -> None

forget_duplicates

forget_duplicates(self, attribute: str, messages: Iterable[M]) -> None

without_duplicates

without_duplicates(self, attribute: str, messages: Iterable[M]) -> list[M]

Publisher

Methods:

publish_data

publish_data(self, data: bytes, attributes: Mapping[str, str]) -> None

Consumer

Methods:

run

run(self, handler: MessageHandler) -> None