steamship.base package#

Submodules#

steamship.base.client module#

class steamship.base.client.Client(api_key: str = None, api_base: str = None, app_base: str = None, web_base: str = None, workspace: str = None, fail_if_workspace_exists: bool = False, profile: str = None, config_file: str = None, config: Configuration = None, trust_workspace_config: bool = False)[source]#

Bases: CamelModel, ABC

Client model.py class.

Separated primarily as a hack to prevent circular imports.

call(verb: Verb, operation: str, payload: Optional[Union[Request, dict]] = None, file: Optional[Any] = None, expect: Optional[Type[T]] = None, debug: bool = False, raw_response: bool = False, is_package_call: bool = False, package_owner: Optional[str] = None, package_id: Optional[str] = None, package_instance_id: Optional[str] = None, as_background_task: bool = False, wait_on_tasks: Optional[List[Union[str, Task]]] = None, timeout_s: Optional[float] = None) Union[Any, Task][source]#

Post to the Steamship API.

All responses have the format:

.. code-block:: json
{

“data”: “<actual response>”, “error”: {“reason”: “<message>”}

} # noqa: RST203

For the Python client we return the contents of the data field if present, and we raise an exception if the error field is filled in.

config: Configuration#
dict(**kwargs) dict[source]#

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

get(operation: str, payload: Optional[Union[Request, dict]] = None, file: Optional[Any] = None, expect: Optional[Any] = None, debug: bool = False, raw_response: bool = False, is_package_call: bool = False, package_owner: Optional[str] = None, package_id: Optional[str] = None, package_instance_id: Optional[str] = None, as_background_task: bool = False, wait_on_tasks: Optional[List[Union[str, Task]]] = None, timeout_s: Optional[float] = None) Union[Any, Task][source]#
logs(offset: int = 0, number: int = 50, invocable_handle: Optional[str] = None, instance_handle: Optional[str] = None, invocable_version_handle: Optional[str] = None, path: Optional[str] = None) Dict[str, Any][source]#

Return generated logs for a client.

The logs will be workspace-scoped. You will only receive logs for packages and plugins that you own.

Parameters
  • offset – The index of the first log entry to return. This can be used with number to page through logs.

  • number – The number of log entries to return. This can be used with offset to page through logs.

  • invocable_handle – Enables optional filtering based on the handle of package or plugin. Example: my-package

  • instance_handle – Enables optional filtering based on the handle of package instance or plugin instance. Example: my-instance

  • invocable_version_handle – Enables optional filtering based on the version handle of package or plugin. Example: 0.0.2

  • path – Enables optional filtering based on request path. Example: /generate.

Returns

Returns a dictionary containing the offset and number of log entries as well as a list of entries that match the specificed filters.

post(operation: str, payload: Optional[Union[Request, dict, BaseModel]] = None, file: Optional[Any] = None, expect: Optional[Any] = None, debug: bool = False, raw_response: bool = False, is_package_call: bool = False, package_owner: Optional[str] = None, package_id: Optional[str] = None, package_instance_id: Optional[str] = None, as_background_task: bool = False, wait_on_tasks: Optional[List[Union[str, Task]]] = None, timeout_s: Optional[float] = None) Union[Any, Task][source]#
switch_workspace(workspace_handle: Optional[str] = None, workspace_id: Optional[str] = None, fail_if_workspace_exists: bool = False, trust_workspace_config: bool = False)[source]#

Switches this client to the requested workspace, possibly creating it. If all arguments are None, the client actively switches into the default workspace.

  • API calls are performed manually to not result in circular imports.

  • Note that the default workspace is technically not necessary for API usage; it will be assumed by the Engine in the absense of a Workspace ID or Handle being manually specified in request headers.

steamship.base.configuration module#

class steamship.base.configuration.Configuration(config_file: Optional[Path] = None, *, apiKey: SecretStr, apiBase: HttpUrl = 'https://api.steamship.com/api/v1/', appBase: HttpUrl = 'https://steamship.run/', webBase: HttpUrl = 'https://steamship.com/', workspaceId: str = None, workspaceHandle: str = None, profile: Optional[str] = None)[source]#

Bases: CamelModel

api_base: HttpUrl#
api_key: SecretStr#
app_base: HttpUrl#
static default_config_file_has_api_key() bool[source]#
profile: Optional[str]#
static remove_api_key_from_default_config()[source]#
web_base: HttpUrl#
workspace_handle: str#
workspace_id: str#

steamship.base.environments module#

class steamship.base.environments.RuntimeEnvironments(value)[source]#

Bases: str, Enum

An enumeration.

LOCALHOST = 'localhost'#
REPLIT = 'replit'#
steamship.base.environments.check_environment(env: RuntimeEnvironments, interactively_set_key: bool = True)[source]#

steamship.base.error module#

exception steamship.base.error.SteamshipError(message: str = 'Undefined remote error', internal_message: Optional[str] = None, suggestion: Optional[str] = None, code: Optional[str] = None, error: Optional[Union[Exception, str]] = None)[source]#

Bases: Exception

code: str = None#
error: str = None#
static from_dict(d: Any) SteamshipError[source]#

Last resort if subclass doesn’t override: pass through.

internal_message: str = None#
log()[source]#
message: str = None#
suggestion: str = None#
to_dict() dict[source]#

steamship.base.mime_types module#

class steamship.base.mime_types.ContentEncodings[source]#

Bases: object

BASE64 = 'base64'#
class steamship.base.mime_types.MimeTypes(value)[source]#

Bases: str, Enum

An enumeration.

BINARY = 'application/octet-stream'#
DOC = 'application/msword'#
DOCX = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'#
EPUB = 'application/epub+zip'#
FILE_JSON = 'fileJson'#
GIF = 'image/gif'#
HTML = 'text/html'#
JPG = 'image/jpeg'#
JSON = 'application/json'#
MKD = 'text/markdown'#
MP3 = 'audio/mp3'#
MP4_AUDIO = 'audio/mp4'#
MP4_VIDEO = 'video/mp4'#
PDF = 'application/pdf'#
PNG = 'image/png'#
PPT = 'applicatino/ms-powerpoint'#
PPTX = 'application/vnd.openxmlformats-officedocument.presentationml.presentation'#
RTF = 'application/rtf'#
STEAMSHIP_BLOCK_JSON = 'application/vnd.steamship-block.json.v1'#
TIFF = 'image/tiff'#
TXT = 'text/plain'#
UNKNOWN = 'unknown'#
WAV = 'audio/wav'#
WEBM_AUDIO = 'audio/webm'#
WEBM_VIDEO = 'video/webm'#

steamship.base.model module#

class steamship.base.model.CamelModel[source]#

Bases: BaseModel

class Config[source]#

Bases: object

alias_generator() str#
allow_population_by_field_name = True#
use_enum_values = True#
class steamship.base.model.GenericCamelModel[source]#

Bases: CamelModel, GenericModel

steamship.base.model.to_camel(s: str) str[source]#

steamship.base.package_spec module#

Objects for recording and reporting upon the introspected interface of a Steamship Package.

class steamship.base.package_spec.ArgSpec(name: str, parameter: Parameter)[source]#

Bases: CamelModel

An argument passed to a method.

kind: str#
name: str#
pprint(name_width: Optional[int] = None, prefix: str = '') str[source]#

Returns a pretty printable representation of this argument.

values: Optional[List[str]]#
class steamship.base.package_spec.MethodSpec(cls: object, name: str, path: str = None, verb: Verb = Verb.POST, config: Dict[str, Union[str, bool, int, float]] = None)[source]#

Bases: CamelModel

A method, callable remotely, on an object.

args: Optional[List[ArgSpec]]#
static clean_path(path: str = '') str[source]#

Ensure that the path always starts with /, and at minimum must be at least /.

config: Optional[Dict]#
doc: Optional[str]#
path: str#
pprint(name_width: Optional[int] = None, prefix: str = '  ') str[source]#

Returns a pretty printable representation of this method.

returns: str#
verb: str#
class steamship.base.package_spec.PackageSpec(*, name: str, doc: Optional[str] = None, sdkVersion: str = 'unknown', methods: Optional[List[MethodSpec]] = None)[source]#

Bases: CamelModel

A package, representing a remotely instantiable service.

doc: Optional[str]#
methods: Optional[List[MethodSpec]]#
name: str#
pprint(prefix: str = '  ') str[source]#

Returns a pretty printable representation of this package.

sdk_version: str#

steamship.base.request module#

class steamship.base.request.CreateRequest(*, id: str = None, handle: str = None)[source]#

Bases: Request

handle: str#
id: str#
class steamship.base.request.DeleteRequest(*, id: str)[source]#

Bases: Request

id: str#
class steamship.base.request.GetRequest(*, id: str = None, handle: str = None)[source]#

Bases: Request

handle: str#
id: str#
class steamship.base.request.IdentifierRequest(*, id: str = None, handle: str = None)[source]#

Bases: Request

handle: str#
id: str#
class steamship.base.request.ListRequest[source]#

Bases: Request

class steamship.base.request.Request[source]#

Bases: CamelModel

class steamship.base.request.UpdateRequest(*, id: str = None, handle: str = None)[source]#

Bases: Request

handle: str#
id: str#

steamship.base.response module#

class steamship.base.response.Response[source]#

Bases: CamelModel

steamship.base.tasks module#

class steamship.base.tasks.CreateTaskCommentRequest(*, taskId: str, externalId: str = None, externalType: str = None, externalGroup: str = None, metadata: str = None)[source]#

Bases: Request

external_group: str#
external_id: str#
external_type: str#
metadata: str#
task_id: str#
class steamship.base.tasks.ListTaskCommentRequest(*, taskId: str = None, externalId: str = None, externalType: str = None, externalGroup: str = None)[source]#

Bases: Request

external_group: str#
external_id: str#
external_type: str#
task_id: str#
class steamship.base.tasks.Task(*, client: Client = None, taskId: str = None, userId: str = None, workspaceId: str = None, expect: Type = None, input: str = None, output: T = None, state: str = None, statusMessage: str = None, statusSuggestion: str = None, statusCode: str = None, statusCreatedOn: str = None, taskType: str = None, taskExecutor: str = None, taskCreatedOn: str = None, taskLastModifiedOn: str = None, remoteStatusInput: Optional[Dict] = None, remoteStatusOutput: Optional[Dict] = None, remoteStatusMessage: str = None, assignedWorker: str = None, startedAt: str = None, maxRetries: int = None, retries: int = None)[source]#

Bases: GenericCamelModel, Generic[T]

Encapsulates a unit of asynchronously performed work.

add_comment(external_id: Optional[str] = None, external_type: Optional[str] = None, external_group: Optional[str] = None, metadata: Optional[Any] = None) TaskComment[source]#
as_error() SteamshipError[source]#
assigned_worker: str#
client: Client#
expect: Type#
static get(client, _id: Optional[str] = None, handle: Optional[str] = None) Task[source]#
input: str#
max_retries: int#
output: T#
classmethod parse_obj(obj: Any) Task[source]#
post_update(fields: Optional[Set[str]] = None) Task[source]#

Updates this task in the Steamship Engine.

refresh()[source]#
remote_status_input: Optional[Dict]#
remote_status_message: str#
remote_status_output: Optional[Dict]#
retries: int#
started_at: str#
state: str#
status_code: str#
status_created_on: str#
status_message: str#
status_suggestion: str#
task_created_on: str#
task_executor: str#
task_id: str#
task_last_modified_on: str#
task_type: str#
update(other: Optional[Task] = None)[source]#

Incorporates a Task into this object.

user_id: str#
wait(max_timeout_s: float = 180, retry_delay_s: float = 1, on_each_refresh: Optional[Callable[[int, float, Task], None]] = None)[source]#

Polls and blocks until the task has succeeded or failed (or timeout reached).

Parameters
  • max_timeout_s (int) – Max timeout in seconds. Default: 180s. After this timeout, an exception will be thrown.

  • retry_delay_s (float) – Delay between status checks. Default: 1s.

  • on_each_refresh (Optional[Callable[[int, float, Task], None]]) –

    Optional call back you can get after each refresh is made, including success state refreshes. The signature represents: (refresh #, total elapsed time, task)

    WARNING: Do not pass a long-running function to this variable. It will block the update polling.

workspace_id: str#
class steamship.base.tasks.TaskComment(*, client: Client = None, id: str = None, userId: str = None, taskId: str = None, externalId: str = None, externalType: str = None, externalGroup: str = None, metadata: Any = None, createdAt: str = None)[source]#

Bases: CamelModel

client: Client#
static create(client: Client, task_id: Optional[str] = None, external_id: Optional[str] = None, external_type: Optional[str] = None, external_group: Optional[str] = None, metadata: Optional[Any] = None) TaskComment[source]#
created_at: str#
delete() TaskComment[source]#
external_group: str#
external_id: str#
external_type: str#
id: str#
static list(client: Client, task_id: Optional[str] = None, external_id: Optional[str] = None, external_type: Optional[str] = None, external_group: Optional[str] = None) TaskCommentList[source]#
metadata: Any#
classmethod parse_obj(obj: Any) BaseModel[source]#
task_id: str#
user_id: str#
class steamship.base.tasks.TaskCommentList(*, comments: List[TaskComment])[source]#

Bases: CamelModel

comments: List[TaskComment]#
class steamship.base.tasks.TaskRunRequest(*, taskId: str)[source]#

Bases: Request

task_id: str#
class steamship.base.tasks.TaskState[source]#

Bases: object

failed = 'failed'#
running = 'running'#
succeeded = 'succeeded'#
waiting = 'waiting'#
class steamship.base.tasks.TaskStatusRequest(*, taskId: str)[source]#

Bases: Request

task_id: str#
class steamship.base.tasks.TaskType[source]#

Bases: object

infer = 'infer'#
internal_api = 'internalApi'#
train = 'train'#

steamship.base.utils module#

Module contents#

class steamship.base.Configuration(config_file: Optional[Path] = None, *, apiKey: SecretStr, apiBase: HttpUrl = 'https://api.steamship.com/api/v1/', appBase: HttpUrl = 'https://steamship.run/', webBase: HttpUrl = 'https://steamship.com/', workspaceId: str = None, workspaceHandle: str = None, profile: Optional[str] = None)[source]#

Bases: CamelModel

api_base: HttpUrl#
api_key: SecretStr#
app_base: HttpUrl#
static default_config_file_has_api_key() bool[source]#
profile: Optional[str]#
static remove_api_key_from_default_config()[source]#
web_base: HttpUrl#
workspace_handle: str#
workspace_id: str#
class steamship.base.MimeTypes(value)[source]#

Bases: str, Enum

An enumeration.

BINARY = 'application/octet-stream'#
DOC = 'application/msword'#
DOCX = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'#
EPUB = 'application/epub+zip'#
FILE_JSON = 'fileJson'#
GIF = 'image/gif'#
HTML = 'text/html'#
JPG = 'image/jpeg'#
JSON = 'application/json'#
MKD = 'text/markdown'#
MP3 = 'audio/mp3'#
MP4_AUDIO = 'audio/mp4'#
MP4_VIDEO = 'video/mp4'#
PDF = 'application/pdf'#
PNG = 'image/png'#
PPT = 'applicatino/ms-powerpoint'#
PPTX = 'application/vnd.openxmlformats-officedocument.presentationml.presentation'#
RTF = 'application/rtf'#
STEAMSHIP_BLOCK_JSON = 'application/vnd.steamship-block.json.v1'#
TIFF = 'image/tiff'#
TXT = 'text/plain'#
UNKNOWN = 'unknown'#
WAV = 'audio/wav'#
WEBM_AUDIO = 'audio/webm'#
WEBM_VIDEO = 'video/webm'#
class steamship.base.RuntimeEnvironments(value)[source]#

Bases: str, Enum

An enumeration.

LOCALHOST = 'localhost'#
REPLIT = 'replit'#
exception steamship.base.SteamshipError(message: str = 'Undefined remote error', internal_message: Optional[str] = None, suggestion: Optional[str] = None, code: Optional[str] = None, error: Optional[Union[Exception, str]] = None)[source]#

Bases: Exception

code: str = None#
error: str = None#
static from_dict(d: Any) SteamshipError[source]#

Last resort if subclass doesn’t override: pass through.

internal_message: str = None#
log()[source]#
message: str = None#
suggestion: str = None#
to_dict() dict[source]#
class steamship.base.Task(*, client: Client = None, taskId: str = None, userId: str = None, workspaceId: str = None, expect: Type = None, input: str = None, output: T = None, state: str = None, statusMessage: str = None, statusSuggestion: str = None, statusCode: str = None, statusCreatedOn: str = None, taskType: str = None, taskExecutor: str = None, taskCreatedOn: str = None, taskLastModifiedOn: str = None, remoteStatusInput: Optional[Dict] = None, remoteStatusOutput: Optional[Dict] = None, remoteStatusMessage: str = None, assignedWorker: str = None, startedAt: str = None, maxRetries: int = None, retries: int = None)[source]#

Bases: GenericCamelModel, Generic[T]

Encapsulates a unit of asynchronously performed work.

add_comment(external_id: Optional[str] = None, external_type: Optional[str] = None, external_group: Optional[str] = None, metadata: Optional[Any] = None) TaskComment[source]#
as_error() SteamshipError[source]#
assigned_worker: str#
client: Client#
expect: Type#
static get(client, _id: Optional[str] = None, handle: Optional[str] = None) Task[source]#
input: str#
max_retries: int#
output: T#
classmethod parse_obj(obj: Any) Task[source]#
post_update(fields: Optional[Set[str]] = None) Task[source]#

Updates this task in the Steamship Engine.

refresh()[source]#
remote_status_input: Optional[Dict]#
remote_status_message: str#
remote_status_output: Optional[Dict]#
retries: int#
started_at: str#
state: str#
status_code: str#
status_created_on: str#
status_message: str#
status_suggestion: str#
task_created_on: str#
task_executor: str#
task_id: str#
task_last_modified_on: str#
task_type: str#
update(other: Optional[Task] = None)[source]#

Incorporates a Task into this object.

user_id: str#
wait(max_timeout_s: float = 180, retry_delay_s: float = 1, on_each_refresh: Optional[Callable[[int, float, Task], None]] = None)[source]#

Polls and blocks until the task has succeeded or failed (or timeout reached).

Parameters
  • max_timeout_s (int) – Max timeout in seconds. Default: 180s. After this timeout, an exception will be thrown.

  • retry_delay_s (float) – Delay between status checks. Default: 1s.

  • on_each_refresh (Optional[Callable[[int, float, Task], None]]) –

    Optional call back you can get after each refresh is made, including success state refreshes. The signature represents: (refresh #, total elapsed time, task)

    WARNING: Do not pass a long-running function to this variable. It will block the update polling.

workspace_id: str#
class steamship.base.TaskState[source]#

Bases: object

failed = 'failed'#
running = 'running'#
succeeded = 'succeeded'#
waiting = 'waiting'#
steamship.base.check_environment(env: RuntimeEnvironments, interactively_set_key: bool = True)[source]#