Source code for aio_sf_streaming.mixins

"""
Mixins module: Provide various mixins modules
"""
import asyncio
import enum
import logging
from typing import Union

from .core import JSONList, JSONObject

logger = logging.getLogger("aio_sf_streaming")


class ConnectionError(Exception):
    ...


[docs]class TimeoutAdviceMixin: """ Simple mixin that automatically set timeout setting according to SF advice, if provided. """ async def messages(self) -> JSONObject: """ See :py:func:`BaseSalesforceStreaming.messages` """ async for message in super().messages(): if message.get("channel", "") == "/meta/connect" and "advice" in message: timeout_advice = message["advice"].get("timeout", None) if timeout_advice: self.timeout = timeout_advice / 1000 yield message
[docs]class ReplayType(enum.Enum): """ Enumeration with special replay values """ ALL_EVENTS = -2 #: Replay all events available. NEW_EVENTS = -1 #: No replay, retrieve only new events.
[docs]class ReplayMixin: """ Mixing adding replay support to the streaming client. This mixin is not enough, you must implement :py:func:`ReplayMixin.store_replay_id` and `:py:func:`ReplayMixin.get_last_replay_id` in a subclass in order to have a working replay. """ async def get_handshake_payload(self) -> JSONObject: """ See :py:func:`BaseSalesforceStreaming.get_handshake_payload` """ payload = await super().get_handshake_payload() # Activate replay extension payload.setdefault("ext", {}).update({"replay": True}) return payload async def get_subscribe_payload(self, channel: str) -> JSONObject: """ See :py:func:`BaseSalesforceStreaming.get_subscribe_payload` """ payload = await super().get_subscribe_payload(channel) # Call inner callback to retrieve the last replay id replay_id = await self.get_last_replay_id(channel) # No response => Use only new events (default behavior) if not replay_id: replay_id = ReplayType.NEW_EVENTS # Extract replay value if isinstance(replay_id, ReplayType): replay_id = replay_id.value replay_id = int(replay_id) # Update payload payload.setdefault("ext", {}).setdefault("replay", {}) payload["ext"]["replay"][channel] = replay_id return payload async def messages(self) -> JSONObject: """ See :py:func:`BaseSalesforceStreaming.messages` """ async for message in super().messages(): channel = message["channel"] # On new message, call callback to store replay id if not channel.startswith("/meta/"): event = message["data"]["event"] replay_id = event["replayId"] creation_time = event["createdDate"] # Create a task : do not wait the replay id is stored to # reconnect as soon as possible self.loop.create_task( self.store_replay_id(channel, replay_id, creation_time) ) yield message
[docs] async def store_replay_id( self, channel: str, replay_id: int, creation_time: str ) -> None: """ Callback called to store a replay id. You should override this method to implement your custom logic. :param channel: Channel name :param replay_id: replay id to store :param creation_time: Creation time. You should store only the last created object but you can not know if you received event in order without this. This value is the string provided by SF. """
[docs] async def get_last_replay_id(self, channel: str) -> Union[ReplayType, int]: """ Callback called to retrieve a replay id. You should override this method to implement your custom logic. :param channel: Channel name """
[docs]class AutoVersionMixin: """ Simple mixin that fetch last api version before connect. """ async def handshake(self) -> JSONList: """ See :py:func:`BaseSalesforceStreaming.handshake` """ # Get last api version data = await self.get("/services/data/") try: self.version = data[-1]["version"] except (IndexError, KeyError): pass logger.info("API version used: %r", self.version) return await super().handshake()
[docs]class AutoReconnectMixin: """ Mixin that will automatically reconnect when asked by Salesforce """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Used to store all subscribed channels self._subchannels = None async def start(self) -> None: """ See :py:func:`BaseSalesforceStreaming.start` """ self._subchannels = set() await super().start() async def subscribe(self, channel: str) -> JSONList: """ See :py:func:`BaseSalesforceStreaming.subscribe` """ self._subchannels.add(channel) return await super().subscribe(channel) async def messages(self) -> JSONObject: """ See :py:func:`BaseSalesforceStreaming.messages` """ async for message in super().messages(): channel = message["channel"] # If asked, perform a new handshake if ( channel.startswith("/meta/") and message.get("error") == "403::Unknown client" ): # Need to re-subscribes, not possible with current design, let crash raise ConnectionError() # logger.info("Disconnected, do new handshake") # await self.handshake() # continue yield message async def unsubscribe(self, channel: str) -> JSONList: """ See :py:func:`BaseSalesforceStreaming.unsubscribe` """ self._subchannels.remove(channel) return await super().unsubscribe(channel) async def stop(self) -> None: """ See :py:func:`BaseSalesforceStreaming.stop` """ await super().stop() self._subchannels = None async def handshake(self) -> JSONList: """ See :py:func:`BaseSalesforceStreaming.handshake` """ response = await super().handshake() # If we reconnect, we must re-subscribe to all channels for channel in self._subchannels: self.loop.create_task(super().subscribe(channel)) return response
[docs]class ReSubscribeMixin: """ Mixin that handle subscription error, will try again after a short delay :param retry_sub_duration: Duration between subscribe retry if server is too buzy (initial value). :param retry_factor: Factor amplification between each successive retry :param retry_max_duration: Maximum value of the retry duration :param retry_max_count: Maximum count of retry, after this count is reach, response or exception are propagated. """ def __init__( self, retry_sub_duration: float = 0.1, retry_factor: float = 1.0, retry_max_duration: float = 30.0, retry_max_count: int = 20, **kwargs, ): super().__init__(**kwargs) self.retry_sub_duration = retry_sub_duration self.retry_factor = retry_factor self.retry_max_duration = retry_max_duration self.retry_max_count = retry_max_count self.retry_current_duration = {} self.retry_current_count = {}
[docs] async def should_retry_on_exception( self, channel: str, exception: Exception ) -> bool: """ Callback called to process an exception raised during subscription. Return a boolean if we must retry. If ``False`` is returned, the exception will be propagated to caller. By-default, do return always ``False``. :param channel: Channel name :param exception: The exception raised """ return False
[docs] async def should_retry_on_error_response( self, channel: str, response: JSONObject ) -> bool: """ Callback called to process a response with and error message. Return a boolean if we must retry. If ``False`` is returned, the response will be returned to caller. By-default, retry on known 'server unavailable' response. :param channel: Channel name :param response: The response received """ failure = response[0].get("ext", {}).get("sfdc", {}).get("failureReason", "") return failure.startswith("SERVER_UNAVAILABLE") or failure.startswith( "503::Server is too busy" )
def _update_retry_count(self, channel: str) -> bool: """ Update retry count for the channel. Return a boolean if we should retry """ self.retry_current_count[channel] = self.retry_current_count.get(channel, 0) + 1 if self.retry_current_count[channel] >= self.retry_max_count: return False duration = self.retry_current_duration.get(channel, -1) if duration < 0: duration = self.retry_sub_duration else: duration = min(duration * self.retry_factor, self.retry_max_count) self.retry_current_duration[channel] = duration return True async def subscribe(self, channel: str) -> JSONList: """ See :py:func:`BaseSalesforceStreaming.subscribe` """ while True: try: response = await super().subscribe(channel) except Exception as e: should_retry = await self.should_retry_on_exception(channel, e) if should_retry: should_retry = self._update_retry_count(channel) if not should_retry: raise else: if response and response[0]["successful"]: should_retry = False else: should_retry = await self.should_retry_on_error_response( channel, response ) if should_retry: should_retry = self._update_retry_count(channel) if not should_retry: self.retry_current_duration[channel] = -1 self.retry_current_count[channel] = 0 return response await asyncio.sleep(self.retry_current_duration[channel])
[docs]class AllMixin( TimeoutAdviceMixin, # Use SF timeout advice AutoVersionMixin, # Auto-fetch last api version ReplayMixin, # Add replay support AutoReconnectMixin, # Add auto-reconnection feature ReSubscribeMixin, ): # Handle subscription errors """ Helper class to add all mixin with one class """