Source code for aio_sf_streaming.connectors
"""
Connectors module: Provide authentication implementation
"""
from typing import Tuple
import aiohttp
from .core import BaseSalesforceStreaming
[docs]class BaseConnector(BaseSalesforceStreaming):
"""
Base class for all sf connectors.
:param client_id: OAuth2 client Id (mandatory)
:param client_secret: Oauth2 client secret (mandatory)
:param login_connector: aiohttp connector used during connection.
Mainly used for test purpose.
See :class:`.BaseSalesforceStreaming` for other keywords arguments.
"""
def __init__(
self,
*,
client_id: str = None,
client_secret: str = None,
login_connector: aiohttp.BaseConnector = None,
**kwargs
):
if any(v is None for v in (client_id, client_secret)):
raise TypeError("All credentials arguments are mandatory")
self.login_connector = login_connector
self.credentials = {"client_id": client_id, "client_secret": client_secret}
super().__init__(**kwargs)
async def fetch_token(self) -> Tuple[str, str]:
# use a temporary session only to fetch token because client session
# does not seems to allow update default headers on a already created
# session
async with aiohttp.ClientSession(
connector=self.login_connector, headers=self.base_header, loop=self.loop
) as session:
async with session.post(self.token_url, data=self.credentials) as resp:
data = await resp.json()
assert data["token_type"] == "Bearer"
instance_url = data["instance_url"]
access_token = data["access_token"]
return access_token, instance_url
[docs]class PasswordSalesforceStreaming(BaseConnector):
"""
Create a SF streaming manager with password flow connection.
Main arguments are connection credentials:
:param username: User login name
:param password: User password
See :class:`.BaseConnector` for other keywords arguments.
"""
def __init__(self, *, username: str = None, password: str = None, **kwargs):
if any(v is None for v in (username, password)):
raise TypeError("All credentials arguments are mandatory")
super().__init__(**kwargs)
# Credentials used to fetch access token
self.credentials.update(
{"grant_type": "password", "username": username, "password": password}
)
[docs]class RefreshTokenSalesforceStreaming(BaseConnector):
"""
Create a SF streaming manager with password refresh token connection.
Main arguments are connection credentials:
:param refresh_token: Refresh token
See :class:`.BaseConnector` for other keywords arguments.
"""
def __init__(self, *, refresh_token: str = None, **kwargs):
if refresh_token is None:
raise TypeError("All credentials arguments are mandatory")
super().__init__(**kwargs)
# Credentials used to fetch access token
self.credentials.update(
{"grant_type": "refresh_token", "refresh_token": refresh_token}
)