Async Client
An asynchronous Client for pythonic access to the Anaplan Integration API v2. This Client provides high-level abstractions over the API, so you can deal with python objects and simple functions rather than implementation details like http, json, compression, chunking etc.
For more information, quick start guides and detailed instructions refer to: Anaplan SDK.
alm
property
alm: _AsyncAlmClient
To use the Application Lifecycle Management (ALM) API, you need a Professional or Enterprise subscription.
The ALM Client provides access to the Anaplan ALM API. This is useful for more advanced use cases where you need retrieve Meta Information for yours models, read or create revisions, spawn sync tasks or generate comparison reports.
Returns:
Type | Description |
---|---|
_AsyncAlmClient
|
The ALM Client. |
transactional
property
transactional: _AsyncTransactionalClient
The Transactional Client provides access to the Anaplan Transactional API. This is useful for more advanced use cases where you need to interact with the Anaplan Model in a more granular way.
If you instantiated the client without the field model_id
, this will raise a
:py:class:ValueError
, since none of the endpoints can be invoked without the model Id.
Returns:
Type | Description |
---|---|
_AsyncTransactionalClient
|
The Transactional Client. |
__init__
__init__(workspace_id: str | None = None, model_id: str | None = None, user_email: str | None = None, password: str | None = None, certificate: str | bytes | None = None, private_key: str | bytes | None = None, private_key_password: str | bytes | None = None, timeout: float = 30, retry_count: int = 2, status_poll_delay: int = 1, upload_chunk_size: int = 25000000, allow_file_creation: bool = False) -> None
An asynchronous Client for pythonic access to the Anaplan Integration API v2: https://anaplan.docs.apiary.io/. This Client provides high-level abstractions over the API, so you can deal with python objects and simple functions rather than implementation details like http, json, compression, chunking etc.
For more information, quick start guides and detailed instructions refer to: https://vinzenzklass.github.io/anaplan-sdk.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace_id
|
str | None
|
The Anaplan workspace Id. You can copy this from the browser URL or find them using an HTTP Client like Postman, Paw, Insomnia etc. |
None
|
model_id
|
str | None
|
The identifier of the model. |
None
|
user_email
|
str | None
|
A valid email registered with the Anaplan Workspace you are attempting to access. The associated user must have Workspace Admin privileges |
None
|
password
|
str | None
|
Password for the given |
None
|
certificate
|
str | bytes | None
|
The absolute path to the client certificate file or the certificate itself. |
None
|
private_key
|
str | bytes | None
|
The absolute path to the private key file or the private key itself. |
None
|
private_key_password
|
str | bytes | None
|
The password to access the private key if there is one. |
None
|
timeout
|
float
|
The timeout in seconds for the HTTP requests. |
30
|
retry_count
|
int
|
The number of times to retry an HTTP request if it fails. Set this to 0 to never retry. Defaults to 2, meaning each HTTP Operation will be tried a total number of 2 times. |
2
|
status_poll_delay
|
int
|
The delay between polling the status of a task. |
1
|
upload_chunk_size
|
int
|
The size of the chunks to upload. This is the maximum size of each chunk. Defaults to 25MB. |
25000000
|
allow_file_creation
|
bool
|
Whether to allow the creation of new files. Defaults to False since this is typically unintentional and may well be unwanted behaviour in the API altogether. A file that is created this way will not be referenced by any action in anaplan until manually assigned so there is typically no value in dynamically creating new files and uploading content to them. |
False
|
export_and_download
async
export_and_download(action_id: int) -> bytes
Convenience wrapper around run_action()
a nd get_file()
to run an export action and
download the exported content in one call.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action_id
|
int
|
The identifier of the action to run. |
required |
Returns:
Type | Description |
---|---|
bytes
|
The content of the exported file. |
from_existing
classmethod
from_existing(existing: Self, workspace_id: str, model_id: str) -> Self
Create a new instance of the Client from an existing instance. This is useful if you want to interact with multiple models or workspaces in the same script but share the same authentication and configuration. This creates a shallow copy of the existing client and update the relevant attributes to the new workspace and model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
existing
|
Self
|
The existing instance to copy. |
required |
workspace_id
|
str
|
The workspace Id to use. |
required |
model_id
|
str
|
The model Id to use. |
required |
Returns:
Type | Description |
---|---|
Self
|
A new instance of the Client. |
get_file
async
get_file(file_id: int) -> bytes
Retrieves the content of the specified file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_id
|
int
|
The identifier of the file to retrieve. |
required |
Returns:
Type | Description |
---|---|
bytes
|
The content of the file. |
get_file_stream
async
get_file_stream(file_id: int) -> AsyncIterator[bytes]
Retrieves the content of the specified file as a stream of chunks. The chunks are yielded one by one, so you can process them as they arrive. This is useful for large files where you don't want to or cannot load the entire file into memory at once.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_id
|
int
|
The identifier of the file to retrieve. |
required |
Returns:
Type | Description |
---|---|
AsyncIterator[bytes]
|
A generator yielding the chunks of the file. |
get_task_status
async
get_task_status(action_id: int, task_id: str) -> dict[str, float | int | str | list | dict | bool]
Retrieves the status of the specified task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action_id
|
int
|
The identifier of the action that was invoked. |
required |
task_id
|
str
|
The identifier of the spawned task. |
required |
Returns:
Type | Description |
---|---|
dict[str, float | int | str | list | dict | bool]
|
The status of the task as returned by the API. For more information see: https://anaplan.docs.apiary.io. |
invoke_action
async
invoke_action(action_id: int) -> str
You may want to consider using run_action()
instead.
Invokes the specified Anaplan Action and returns the spawned Task identifier. This is useful if you want to handle the Task status yourself or if you want to run multiple Actions in parallel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action_id
|
int
|
The identifier of the Action to run. Can be any Anaplan Invokable. |
required |
Returns:
Type | Description |
---|---|
str
|
The identifier of the spawned Task. |
list_actions
async
list_actions() -> list[Action]
Lists all the Actions in the Model. This will only return the Actions listed under
Other Actions
in Anaplan. For Imports, exports, and processes, see their respective
methods instead.
Returns:
Type | Description |
---|---|
list[Action]
|
The List of Actions. |
list_exports
async
list_exports() -> list[Export]
list_files
async
list_files() -> list[File]
list_imports
async
list_imports() -> list[Import]
list_models
async
list_models() -> list[Model]
Lists all the Models the authenticated user has access to.
Returns:
Type | Description |
---|---|
list[Model]
|
The List of Models. |
list_processes
async
list_processes() -> list[Process]
Lists all the Processes in the Model.
Returns:
Type | Description |
---|---|
list[Process]
|
The List of Processes. |
list_workspaces
async
list_workspaces() -> list[Workspace]
Lists all the Workspaces the authenticated user has access to.
Returns:
Type | Description |
---|---|
list[Workspace]
|
The List of Workspaces. |
run_action
async
run_action(action_id: int) -> None
Runs the specified Anaplan Action and validates the spawned task. If the Action fails or
completes with errors, will raise an :py:class:AnaplanActionError
. Failed Tasks are
usually not something you can recover from at runtime and often require manual changes in
Anaplan, i.e. updating the mapping of an Import or similar. So, for convenience, this will
raise an Exception to handle - if you for e.g. think that one of the uploaded chunks may
have been dropped and simply retrying with new data may help - and not return the task
status information that needs to be handled by the caller.
If you need more information or control, you can use invoke_action()
and
get_task_status()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action_id
|
int
|
The identifier of the Action to run. Can be any Anaplan Invokable; Processes, Imports, Exports, Other Actions. |
required |
upload_and_import
async
upload_and_import(file_id: int, content: str | bytes, action_id: int) -> None
Convenience wrapper around upload_file()
and run_action()
to upload content to a file
and run an import action in one call.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_id
|
int
|
The identifier of the file to upload to. |
required |
content
|
str | bytes
|
The content to upload. This Content will be compressed before uploading. If you are passing the Input as bytes, pass it uncompressed to avoid redundant work. |
required |
action_id
|
int
|
The identifier of the action to run after uploading the content. |
required |
upload_file
async
upload_file(file_id: int, content: str | bytes) -> None
Uploads the content to the specified file. If there are several chunks, upload of individual chunks are concurrent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_id
|
int
|
The identifier of the file to upload to. |
required |
content
|
str | bytes
|
The content to upload. This Content will be compressed before uploading. If you are passing the Input as bytes, pass it uncompressed to avoid redundant work. |
required |
upload_file_stream
async
upload_file_stream(file_id: int, content: AsyncIterator[bytes | str] | Iterator[str | bytes]) -> None
Uploads the content to the specified file as a stream of chunks. This is useful either for large files where you don't want to or cannot load the entire file into memory at once, or if you simply do not know the number of chunks ahead of time and instead just want to pass on chunks i.e. consumed from a queue until it is exhausted. In this case, you can pass a generator that yields the chunks of the file one by one to this method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_id
|
int
|
The identifier of the file to upload to. |
required |
content
|
AsyncIterator[bytes | str] | Iterator[str | bytes]
|
An Iterator or AsyncIterator yielding the chunks of the file. (Most likely a generator). |
required |