Developer Interface¶
This part of the documentation covers all the interfaces of aio_sf_streaming.
Code organization¶
The package is separated in 5 mains modules:
High-level classes and helpers are provided to have a quickly functional client. See Main Interface section.
The Base class section describe the low-level base class that implement the main client logic.
To authenticate on Salesforce, you must use one connector that add authentication capability to
BaseSalesforceStreaming
. See Connectors section for a list of available connectors.Finally, Mixins extend
BaseSalesforceStreaming
capabilities and can be added easily as opt-in option by sub classing.
Main Interface¶
-
class
aio_sf_streaming.
SimpleSalesforceStreaming
(username, password, client_id, client_secret, *, sandbox=False, version='42.0', loop=None, connector=None, login_connector=None, retry_sub_duration=0.1, retry_factor=1.0, retry_max_duration=30.0, retry_max_count=20)[source]¶ A simple helper class providing all-in-one functionalities.
- Parameters
username (
str
) – User login namepassword (
str
) – User passwordclient_id (
str
) – OAuth2 client Idclient_secret (
str
) – Oauth2 client secretsandbox (
bool
) – IfTrue
, the connexion will be made on a sandbox, fromhttps://test.salesforce.com
instead of the main login route athttps://login.salesforce.com
.version (
str
) – The API version to use. For example'42.0'
.loop (
Optional
[AbstractEventLoop
]) – Asyncio loop usedconnector (
Optional
[BaseConnector
]) –aiohttp
connector used for main session. Mainly used for test purpose.login_connector (
Optional
[BaseConnector
]) –aiohttp
connector used during connection. Mainly used for test purpose.retry_sub_duration (
float
) – Duration between subscribe retry if server is too buzy.retry_factor (
float
) – Factor amplification between each successive retryretry_max_duration (
float
) – Maximum value of the retry durationretry_max_count (
int
) – Maximum count of retry, after this count is reach, response or exception are propagated.
Usage example:
class MyClient(SimpleSalesforceStreaming): def __init__(self): self.replays = [] super().__init__(username='my-username', password='my-password', client_id='my-client-id', client_secret='my-client-secret') async def store_replay_id(self, channel, replay_id, creation_time): # We only store replay id without any use self.replays.append((channel, replay_id, creation_time)) async def get_last_replay_id(self, channel): # We ask for only use new events return EventType.NEW_EVENTS async def print_events(): async with MyClient() as client: await client.subscribe('/topic/Foo') async for message in client.events(): channel = message['channel'] print(f"Message received on {channel} : {message}") loop = asyncio.get_event_loop() loop.run_until_complete(print_event())
SimpleSalesforceStreaming
inherit all members from their base class. Only main one, for external use, are listed here.-
coroutine
start
(self)¶ See
BaseSalesforceStreaming.start()
- Return type
None
-
coroutine
subscribe
(self, channel)¶ See
BaseSalesforceStreaming.subscribe()
- Return type
List
[Dict
[str
,Any
]]
-
async-for
messages
(self)¶ See
BaseSalesforceStreaming.messages()
- Return type
Dict
[str
,Any
]
-
async-for
events
(self)¶ Asynchronous generator that fetch new events and return one as soon as one is available:
async for message in client.events(): channel = message['channel'] print(channel, ':', message)
This method is different from
BaseSalesforceStreaming.messages()
because it filter messages and provide only those related to the channels you subscribed.- Return type
Dict
[str
,Any
]
-
coroutine
store_replay_id
(self, channel, replay_id, creation_time)¶ Callback called to store a replay id. You should override this method to implement your custom logic.
- Parameters
channel (
str
) – Channel namereplay_id (
int
) – replay id to storecreation_time (
str
) – Creation time. You should store only the last created object but you can not know if you received event in order without this. This value is the string provided by SF.
- Return type
None
-
coroutine
get_last_replay_id
(self, channel)¶ Callback called to retrieve a replay id. You should override this method to implement your custom logic.
- Parameters
channel (
str
) – Channel name- Return type
Union
[ReplayType
,int
]
-
coroutine
ask_stop
(self)¶ Ask client to stop receiving event:
async for event in client.events(): ... if ...: await client.ask_stop()
This call will eventually stop
BaseSalesforceStreaming.messages()
andBaseSalesforceStreaming.events()
async generator but this can take some time if not called inside the loop body: the generator will wait a timeout response from Salesforce server.- Return type
None
-
coroutine
unsubscribe
(self, channel)¶ See
BaseSalesforceStreaming.unsubscribe()
- Return type
List
[Dict
[str
,Any
]]
-
coroutine
stop
(self)¶ See
BaseSalesforceStreaming.stop()
- Return type
None
-
class
aio_sf_streaming.
SimpleRefreshTokenSalesforceStreaming
(refresh_token, client_id, client_secret, *, sandbox=False, version='42.0', loop=None, connector=None, login_connector=None, retry_sub_duration=0.1, retry_factor=1.0, retry_max_duration=30.0, retry_max_count=20)[source]¶ A simple helper class providing all-in-one functionalities.
- Parameters
refresh_token (
str
) – Refresh tokenclient_id (
str
) – OAuth2 client Idclient_secret (
str
) – Oauth2 client secretsandbox (
bool
) – IfTrue
, the connexion will be made on a sandbox, fromhttps://test.salesforce.com
instead of the main login route athttps://login.salesforce.com
.version (
str
) – The API version to use. For example'42.0'
.loop (
Optional
[AbstractEventLoop
]) – Asyncio loop usedconnector (
Optional
[BaseConnector
]) –aiohttp
connector used for main session. Mainly used for test purpose.login_connector (
Optional
[BaseConnector
]) –aiohttp
connector used during connection. Mainly used for test purpose.retry_sub_duration (
float
) – Duration between subscribe retry if server is too buzy.retry_factor (
float
) – Factor amplification between each successive retryretry_max_duration (
float
) – Maximum value of the retry durationretry_max_count (
int
) – Maximum count of retry, after this count is reach, response or exception are propagated.
Usage example:
class MyClient(SimpleRefreshTokenSalesforceStreaming): def __init__(self): self.replays = [] super().__init__(refresh_token='refresh_token', client_id='my-client-id', client_secret='my-client-secret') async def store_replay_id(self, channel, replay_id, creation_time): # We only store replay id without any use self.replays.append((channel, replay_id, creation_time)) async def get_last_replay_id(self, channel): # We ask for only use new events return EventType.NEW_EVENTS async def print_events(): async with MyClient() as client: await client.subscribe('/topic/Foo') async for message in client.events(): channel = message['channel'] print(f"Message received on {channel} : {message}") loop = asyncio.get_event_loop() loop.run_until_complete(print_event())
SimpleSalesforceStreaming
inherit all members from their base class. Only main one, for external use, are listed here. SeeSimpleRefreshTokenSalesforceStreaming
for method description.
Base class¶
-
class
aio_sf_streaming.
BaseSalesforceStreaming
(*, sandbox=False, version='42.0', loop=None, connector=None)[source]¶ Base low-level aio-sf-streaming class.
Can not be used directly: must be sub-classed with at least one connector implementation. The class provide basic functionalities. Additional functionalities can be added with provided mixins.
The main logic is implemented here but you should not use it directly.
- Parameters
sandbox (
bool
) – IfTrue
, the connexion will be made on a sandbox, fromhttps://test.salesforce.com
instead of the main login route athttps://login.salesforce.com
.version (
str
) – The API version to use. For example'42.0'
.loop (
Optional
[AbstractEventLoop
]) – Asyncio loop usedconnector (
Optional
[BaseConnector
]) –aiohttp
connector used for main session. Mainly used for test purpose.
This class supports the context manager protocol for self closing.
All main members are coroutine, even if default implementation does do any asynchronous call. With this convention, sub classes and mixins can easily override this members and do complex call.
See
SimpleSalesforceStreaming
for an usage example.High level api
-
coroutine
start
(self)[source]¶ Connect to Salesforce, authenticate and init CometD connexion.
A best practice is to use async context manager interface that will call this method directly.
- Return type
None
-
coroutine
subscribe
(self, channel)[source]¶ Subscribe to a channel. Can be used directly:
await client.subscribe('/topic/Foo')
This method, and the underlying protocol, are safe to be started as an background task:
loop.create_task(client.subscribe('/topic/Foo'))
- Return type
List
[Dict
[str
,Any
]]
-
async-for
messages
(self)[source]¶ Asynchronous generator that fetch new messages and return one as soon as one is available:
async for message in client.messages(): channel = message['channel'] print(channel, ':', message)
This method iterate over all messages, even on internal/meta one. If you want to only iterate over messages from channels you subscribed, you should use
BaseSalesforceStreaming.events()
.Warning
Linked to the underlying protocol, long-pooling based, the client should reconnect as soon as possible. Practically, client have 40 seconds to reconnect. If your processing take a longer time, a new connection should be made. You should avoid doing long processing between each iteration or launch this processing into a background task.
- Return type
Dict
[str
,Any
]
-
async-for
events
(self)[source]¶ Asynchronous generator that fetch new events and return one as soon as one is available:
async for message in client.events(): channel = message['channel'] print(channel, ':', message)
This method is different from
BaseSalesforceStreaming.messages()
because it filter messages and provide only those related to the channels you subscribed.- Return type
Dict
[str
,Any
]
-
coroutine
ask_stop
(self)[source]¶ Ask client to stop receiving event:
async for event in client.events(): ... if ...: await client.ask_stop()
This call will eventually stop
BaseSalesforceStreaming.messages()
andBaseSalesforceStreaming.events()
async generator but this can take some time if not called inside the loop body: the generator will wait a timeout response from Salesforce server.- Return type
None
-
coroutine
unsubscribe
(self, channel)[source]¶ Unsubscribe to a channel. Can be used directly:
await client.unsubscribe('/topic/Foo')
This method, and the underlying protocol, are safe to be started as an background task:
loop.create_task(client.unsubscribe('/topic/Foo'))
- Return type
List
[Dict
[str
,Any
]]
-
coroutine
stop
(self)[source]¶ Disconnect to Salesforce and close underlying connection.
A best practice is to use async context manager interface that will call this method directly.
- Return type
None
Connection logic
-
token_url
¶ The url that should be used to fetch an access token.
- Return type
str
-
coroutine
fetch_token
(self)[source]¶ Abstract coroutine method of connector that must provide an access token and the instance url linked.
- Return type
Tuple
[str
,str
]
-
coroutine
create_connected_session
(self)[source]¶ This coroutine create an
aiohttp.ClientSession
using fetched token- Return type
ClientSession
-
coroutine
close_session
(self)[source]¶ Close the underlying
aiohttp.ClientSession
connection- Return type
None
Bayeux/CometD logic layer
-
end_point
¶ Cometd endpoint
- Return type
str
-
coroutine
get_handshake_payload
(self)[source]¶ Provide the handshake payload
- Return type
Dict
[str
,Any
]
-
coroutine
get_subscribe_payload
(self, channel)[source]¶ Provide the subscription payload for a specific channel
- Return type
Dict
[str
,Any
]
-
coroutine
get_unsubscribe_payload
(self, channel)[source]¶ Provide the unsubscription payload for a specific channel
- Return type
Dict
[str
,Any
]
-
coroutine
send
(self, data)[source]¶ Send data to CometD server when the connection is established:
# Manually disconnect await client.send({'channel': '/meta/disconnect'})
- Return type
Union
[Dict
[str
,Any
],List
[Dict
[str
,Any
]]]
-
coroutine
handshake
(self)[source]¶ Coroutine that perform an handshake (mandatory before any other action)
- Return type
List
[Dict
[str
,Any
]]
-
coroutine
disconnect
(self)[source]¶ Disconnect from the SF streaming server
- Return type
List
[Dict
[str
,Any
]]
I/O layer helpers
-
coroutine
get
(self, sub_url, **kwargs)[source]¶ Perform a simple json get request from an internal url:
response = await.client.get('/myendpoint/')
- Return type
Union
[Dict
[str
,Any
],List
[Dict
[str
,Any
]]]
-
coroutine
post
(self, sub_url, **kwargs)[source]¶ Perform a simple json post request from an internal url:
response = await.client.post('/myendpoint/', json={'data': 'foo'})
- Return type
Union
[Dict
[str
,Any
],List
[Dict
[str
,Any
]]]
-
coroutine
request
(self, method, sub_url, **kwargs)[source]¶ Perform a simple json request from an internal url
- Return type
Union
[Dict
[str
,Any
],List
[Dict
[str
,Any
]]]
Other attributes
-
loop
¶ Running event loop
- Return type
AbstractEventLoop
Connectors¶
-
class
aio_sf_streaming.
BaseConnector
(*, client_id=None, client_secret=None, login_connector=None, **kwargs)[source]¶ Base class for all sf connectors.
- Parameters
client_id (
Optional
[str
]) – OAuth2 client Id (mandatory)client_secret (
Optional
[str
]) – Oauth2 client secret (mandatory)login_connector (
Optional
[BaseConnector
]) – aiohttp connector used during connection. Mainly used for test purpose.
See
BaseSalesforceStreaming
for other keywords arguments.
-
class
aio_sf_streaming.
PasswordSalesforceStreaming
(*, username=None, password=None, **kwargs)[source]¶ Create a SF streaming manager with password flow connection.
Main arguments are connection credentials:
- Parameters
username (
Optional
[str
]) – User login namepassword (
Optional
[str
]) – User password
See
BaseConnector
for other keywords arguments.
-
class
aio_sf_streaming.
RefreshTokenSalesforceStreaming
(*, refresh_token=None, **kwargs)[source]¶ Create a SF streaming manager with password refresh token connection.
Main arguments are connection credentials:
- Parameters
refresh_token (
Optional
[str
]) – Refresh token
See
BaseConnector
for other keywords arguments.
Mixins¶
-
class
aio_sf_streaming.
AllMixin
(*args, **kwargs)[source]¶ Helper class to add all mixin with one class
-
class
aio_sf_streaming.
TimeoutAdviceMixin
[source]¶ Simple mixin that automatically set timeout setting according to SF advice, if provided.
-
class
aio_sf_streaming.
ReplayType
(value)[source]¶ Enumeration with special replay values
-
ALL_EVENTS
= -2¶ Replay all events available.
-
NEW_EVENTS
= -1¶ No replay, retrieve only new events.
-
-
class
aio_sf_streaming.
ReplayMixin
[source]¶ Mixing adding replay support to the streaming client.
This mixin is not enough, you must implement
ReplayMixin.store_replay_id()
and :py:func:`ReplayMixin.get_last_replay_id in a subclass in order to have a working replay.-
coroutine
get_last_replay_id
(self, channel)[source]¶ Callback called to retrieve a replay id. You should override this method to implement your custom logic.
- Parameters
channel (
str
) – Channel name- Return type
Union
[ReplayType
,int
]
-
coroutine
store_replay_id
(self, channel, replay_id, creation_time)[source]¶ Callback called to store a replay id. You should override this method to implement your custom logic.
- Parameters
channel (
str
) – Channel namereplay_id (
int
) – replay id to storecreation_time (
str
) – Creation time. You should store only the last created object but you can not know if you received event in order without this. This value is the string provided by SF.
- Return type
None
-
coroutine
-
class
aio_sf_streaming.
AutoVersionMixin
[source]¶ Simple mixin that fetch last api version before connect.
-
class
aio_sf_streaming.
AutoReconnectMixin
(*args, **kwargs)[source]¶ Mixin that will automatically reconnect when asked by Salesforce
-
class
aio_sf_streaming.
ReSubscribeMixin
(retry_sub_duration=0.1, retry_factor=1.0, retry_max_duration=30.0, retry_max_count=20, **kwargs)[source]¶ Mixin that handle subscription error, will try again after a short delay
- Parameters
retry_sub_duration (
float
) – Duration between subscribe retry if server is too buzy (initial value).retry_factor (
float
) – Factor amplification between each successive retryretry_max_duration (
float
) – Maximum value of the retry durationretry_max_count (
int
) – Maximum count of retry, after this count is reach, response or exception are propagated.
-
coroutine
should_retry_on_error_response
(self, channel, response)[source]¶ Callback called to process a response with and error message. Return a boolean if we must retry. If
False
is returned, the response will be returned to caller.By-default, retry on known ‘server unavailable’ response.
- Parameters
channel (
str
) – Channel nameresponse (
Dict
[str
,Any
]) – The response received
- Return type
bool
-
coroutine
should_retry_on_exception
(self, channel, exception)[source]¶ Callback called to process an exception raised during subscription. Return a boolean if we must retry. If
False
is returned, the exception will be propagated to caller.By-default, do return always
False
.- Parameters
channel (
str
) – Channel nameexception (
Exception
) – The exception raised
- Return type
bool