Skip to content

Convenience classes

Job, ProcessingStep, and Workdata all have convenience classes that make it easy to interact with the client by hiding away some details. These classes wrap the corresponding HCO and any other classes that are useful. This allows to simplify common tasks but may lead to slower performance in some cases.

The functions of these classes return the current instance which allows method chaining as in builder pattern. They are located inside tool package and are imported like:

from pinexq_client.job_management import Job, JobQuery, ProcessingStep, ProcessingStepQuery, WorkData, WorkDataQuery

These classes are mostly instantiated using a initialized httpx client:

from httpx import Client
client = Client(
base_url="https://myapihost.com:80",
headers={'x-api-key': 'my-api-key'}
)

For more details see Setup the client

The following code creates a job instance, selects a processing step, and assigns a workdata collection to the first input dataslot, in a single statement.

job = (Job(client=client)
.create(name=f"TestJob-{uuid.uuid4()}")
.select_processing(processing_step_instance=processing_step)
.assign_collection_input_dataslot(0, work_data_instances=[work_data1, work_data2]))

The following code creates a ProcessingStep instance, sets tags, edits its properties, uploads configuration, and configures default parameters, in a single statement.

processing_step = (ProcessingStep(client)
.create(processing_step_name, processing_step_name)
.set_tags(["tag1", "tag2", "tag3"])
.edit_properties(new_title=processing_step_name + '-edited',
new_function_name=processing_step_name + '-edited')
.upload_configuration(json_configuration)
.configure_default_parameters(**processing_step_parameters))

The following code creates a WorkData instance, sets tags, and makes it deletable, in a single statement.

(WorkData(client=client)
.create(filename=file_name, binary=content.encode(), mediatype=MediaTypes.TEXT)
.set_tags([TEST_TAG])
.allow_deletion())

This class provides an easy interface to execute multiple jobs and wait for their completion.

job1 = (...)
job2 = (...)
jobs = [job1, job2]
(JobGroup(client) # initialize JobGroup
.add_jobs(jobs) # add jobs to the group
.start_all() # start all jobs in the group
.wait_all()) # wait for all of them to complete
for job in jobs:
assert job.get_state() == JobStates.completed

JobGroup can also be initialized from a JobQueryResult object using the from_query_result() method.

(JobGroup
.from_query_result(client, jobs) # initialize JobGroup using a JobQueryResult
.start_all() # start all jobs in the group
.wait_all()) # wait for all of them to complete
  • incomplete_job_count() -> returns a count of incomeplete jobs in the group
  • jobs_with_error() -> returns a list of jobs from the group that completed with error
  • remove(job_name: str) -> removes all jobs from the group whose name matches the provided name
  • clear() -> removes all jobs from the group
  • get_jobs() -> returns the list of jobs in the group

Query convenience classes (JobQuery, ProcessingStepQuery, WorkDataQuery)

Section titled “Query convenience classes (JobQuery, ProcessingStepQuery, WorkDataQuery)”

The JobQuery, ProcessingStepQuery, and WorkDataQuery classes provide a fluent way to search for resources using various filters and sorting options.

from pinexq_client.job_management import JobQuery, ProcessingStepQuery, WorkDataQuery, JobStates
# Find all failed jobs with a specific tag
job_query = JobQuery.create(client, state=JobStates.error, tags_by_and=["production"])
# or
job_query = JobQuery.from_parameters(client, jobQueryParametersObject)
# Find a processing step by name
ps_query = ProcessingStepQuery.create(client, function_name="my_function", is_public=True)
# or
ps_query = ProcessingStepQuery.from_parameters(client, psQueryParametersObject)
# Find work data by name contains
wd_query = WorkDataQuery.create(client, name_contains="results")
# or
wd_query = WorkDataQuery.from_parameters(client, wdQueryParametersObject)

All query classes have the following methods:

  • create(client, **kwargs): Creates a query from flat parameters. Supports filtering, sorting (sort_by_property_name, sort_by_sort_type), and pagination (page_size, page_offset).
  • from_parameters(client, parameters): Creates a query from a parameters object (e.g., JobQueryParameters).
  • iter(): Returns an iterator that yields convenience objects (Job, ProcessingStep, or WorkData), automatically fetching next pages as needed.
  • count(): Returns the total number of entities matching the query.
  • exists(): Returns True if at least one entity matches.
  • first() / one(): Returns the first matching object or exactly one (raises error if multiple).
  • to_list(): Returns all matching objects as a list.
  • execute(): Executes the query and returns a raw HCO result (e.g., JobQueryResultHco).
  • to_job_group(): Returns a JobGroup containing all jobs matching the query.