Source code for aio_sf_streaming

"""
aio_sf_streaming
~~~~~~~~~~~~~~~~

aio_sf_streaming is a simple asyncio Salesforce Streaming API client for
Python 3.6+
"""
import asyncio
import aiohttp
from .core import BaseSalesforceStreaming
from .connectors import (
    BaseConnector,
    PasswordSalesforceStreaming,
    RefreshTokenSalesforceStreaming,
)
from .mixins import (
    TimeoutAdviceMixin,
    AutoVersionMixin,
    ReplayMixin,
    ReplayType,
    AutoReconnectMixin,
    ReSubscribeMixin,
    AllMixin,
)
from .__version__ import __version__


[docs]class SimpleSalesforceStreaming(AllMixin, PasswordSalesforceStreaming): # Password flow """ A simple helper class providing all-in-one functionalities. :param username: User login name :param password: User password :param client_id: OAuth2 client Id :param client_secret: Oauth2 client secret :param sandbox: If ``True``, the connexion will be made on a sandbox, from ``https://test.salesforce.com`` instead of the main login route at ``https://login.salesforce.com``. :param version: The API version to use. For example ``'42.0'``. :param loop: Asyncio loop used :param connector: ``aiohttp`` connector used for main session. Mainly used for test purpose. :param login_connector: ``aiohttp`` connector used during connection. Mainly used for test purpose. :param retry_sub_duration: Duration between subscribe retry if server is too buzy. :param retry_factor: Factor amplification between each successive retry :param retry_max_duration: Maximum value of the retry duration :param retry_max_count: Maximum count of retry, after this count is reach, response or exception are propagated. **Usage example**:: class MyClient(SimpleSalesforceStreaming): def __init__(self): self.replays = [] super().__init__(username='my-username', password='my-password', client_id='my-client-id', client_secret='my-client-secret') async def store_replay_id(self, channel, replay_id, creation_time): # We only store replay id without any use self.replays.append((channel, replay_id, creation_time)) async def get_last_replay_id(self, channel): # We ask for only use new events return EventType.NEW_EVENTS async def print_events(): async with MyClient() as client: await client.subscribe('/topic/Foo') async for message in client.events(): channel = message['channel'] print(f"Message received on {channel} : {message}") loop = asyncio.get_event_loop() loop.run_until_complete(print_event()) """ def __init__( self, username: str, password: str, client_id: str, client_secret: str, *, sandbox: bool = False, version: str = "42.0", loop: asyncio.AbstractEventLoop = None, connector: aiohttp.BaseConnector = None, login_connector: aiohttp.BaseConnector = None, retry_sub_duration: float = 0.1, retry_factor: float = 1., retry_max_duration: float = 30., retry_max_count: int = 20, ) -> None: super().__init__( username=username, password=password, client_id=client_id, client_secret=client_secret, sandbox=sandbox, version=version, loop=loop, connector=connector, login_connector=login_connector, retry_sub_duration=retry_sub_duration, retry_factor=retry_factor, retry_max_duration=retry_max_duration, retry_max_count=retry_max_count, )
[docs]class SimpleRefreshTokenSalesforceStreaming( AllMixin, RefreshTokenSalesforceStreaming ): # Refresh token flow """ A simple helper class providing all-in-one functionalities. :param refresh_token: Refresh token :param client_id: OAuth2 client Id :param client_secret: Oauth2 client secret :param sandbox: If ``True``, the connexion will be made on a sandbox, from ``https://test.salesforce.com`` instead of the main login route at ``https://login.salesforce.com``. :param version: The API version to use. For example ``'42.0'``. :param loop: Asyncio loop used :param connector: ``aiohttp`` connector used for main session. Mainly used for test purpose. :param login_connector: ``aiohttp`` connector used during connection. Mainly used for test purpose. :param retry_sub_duration: Duration between subscribe retry if server is too buzy. :param retry_factor: Factor amplification between each successive retry :param retry_max_duration: Maximum value of the retry duration :param retry_max_count: Maximum count of retry, after this count is reach, response or exception are propagated. **Usage example**:: class MyClient(SimpleRefreshTokenSalesforceStreaming): def __init__(self): self.replays = [] super().__init__(refresh_token='refresh_token', client_id='my-client-id', client_secret='my-client-secret') async def store_replay_id(self, channel, replay_id, creation_time): # We only store replay id without any use self.replays.append((channel, replay_id, creation_time)) async def get_last_replay_id(self, channel): # We ask for only use new events return EventType.NEW_EVENTS async def print_events(): async with MyClient() as client: await client.subscribe('/topic/Foo') async for message in client.events(): channel = message['channel'] print(f"Message received on {channel} : {message}") loop = asyncio.get_event_loop() loop.run_until_complete(print_event()) """ def __init__( self, refresh_token: str, client_id: str, client_secret: str, *, sandbox: bool = False, version: str = "42.0", loop: asyncio.AbstractEventLoop = None, connector: aiohttp.BaseConnector = None, login_connector: aiohttp.BaseConnector = None, retry_sub_duration: float = 0.1, retry_factor: float = 1., retry_max_duration: float = 30., retry_max_count: int = 20, ) -> None: super().__init__( refresh_token=refresh_token, client_id=client_id, client_secret=client_secret, sandbox=sandbox, version=version, loop=loop, connector=connector, login_connector=login_connector, retry_sub_duration=retry_sub_duration, retry_factor=retry_factor, retry_max_duration=retry_max_duration, retry_max_count=retry_max_count, )