SSP Technical Design: Bulk Actions

Overview

"Actionable Search Result/Caseload" features, e.g. bulk program status change, bulk early alert submission, etc, require asynchronous processing since users can easily submit "actions" which we cannot guarantee will complete within a reasonable time frame for the HTTP request/response cycle. SSP needs a persistent task queue to support this asynchronous processing.

For SSP 2.6.0, the first bulk actions to be supported are:

  1. Export Search/Caseload
  2. Program Status Change
  3. Watch/Unwatch
  4. Send Email to Search/Caseload

Export Search/Caseload should not be any more heavy-weight than the original read operation that resulted in the exportable dataset, but may need to be arbitrarily bounded to prevent excessively large downloads (we know some external_person tables have ~1million rows). Watch/Unwatch should, in theory, not be any more heavy-weight than the bulk caseload reassignment functionality we already support. But again, the result sets being manipulated are potentially much larger, so may need to bounded. But even if those two operations can be bounded in a way that they do not require asynchronous processing, Program Status Change and Send Email to Search/Caseload are heavy-weight enough that they do likely require a task queue, even for run-of-the-mill batch sizes. And we know that batch submission of Early Alerts, which is even more heavy-weight, is likely the next bulk action to be implemented. So building a generic task queue in the 2.6.0 time frame is justifiable.

The Task Queue

The Task Queue will consist of:

  1. Database table - For storing the queue itself
  2. Task queue service/DAO - Standard app-layer technical infrastructure for encapsulating the database table
  3. Task scheduler - Background job for popping tasks off the queue
  4. Task executor - For dispatching the work asynchronously w/r/t the scheduled job. Prevents long-running jobs from clogging the queue.
  5. APIs - Instead of operating on entities one-by-one, bulk operation requests will payloads specifying both the proposed state change and either a list of entities to be modified or a query specification for deferred calculation of such lists. Responses will be pointers to the created task queue entry. An API for browsing the queue itself is likely out of scope at this time.

Task Queue Persistence

 

 

 

ColumnTypeNullableDescription
idUUIDNo

Standard PK field

created_datetimestampNoStandard audit field
created_byUUIDNoStandard audit field. FK to person.id
modified_datetimestampNo

Standard audit field

modified_byUUIDNoStandard audit field. FK to person.id
owner_idUUIDNoActual end user requesting the work. Similar to map_plan.owner_id. FK to person.id
run_as_idUUIDNoThe person as whom the work should execute. Typically set to owner_id except in cases where the job must run with elevated, system-level permissions. FK to person.id
execution_component_namevarcharNoSpring bean name to which work execution should delegate. Bean must implement QueuedTaskExecutor. See below
execution_spectextYesJSON-encoded Java Pojo representing the originally proposed unit of work. E.g. for bulk mail send, a JSON struct containing the requested subject, body, CC addrs, and list of targeted person IDs. Typically deserialized as part of QueuedTaskExecutor.execute() internals.
execution_statetextNoTypically JSON-encoded state of task-execution-in-progress. Long-running tasks may choose to batch their operations into multiple transactions. This allows such work to bookmark progress such that restarts do not result in duplicate operations, and without modifying the original execution_spec. Default to '{}', i.e. the empty map.
workflow_statusvarcharNo

Enum:

QUEUED - Default. Eligible to be picked up by the app for processing

SCHEDULING - Task has been picked up by the app as is being prepared for execution, usually in the app's internal queues. In combination with scheduled_by_process, acts a lock indicating that an app instance has taken ownership of the task. When set into this state, scheduling_started_date should also be set and execution_started_date and workflow_status_desc cleared.

EXECUTING - Application's internal scheduling mechanism has fully processed the task and task work is actually being performed.

COMPLETED - Task execution completed successfully.

ERROR - Task could not run to completion because of a problem in the app's scheduling mechanism.

FAILURE - Task could not run to completion because of a problem in the task itself.

workflow_status_desctextYesFree-form text adding detail to workflow_status. Usually only set if status is ERROR or FAILURE, in which case it is usually set to a stack trace or error code. Error code preferred (similar to LTI).
scheduling_started_datetimestampYesMoment at which task was popped off the queue and was set to a SCHEDULING status.
scheduled_by_processvarcharYes

Opaque string which identifies the process which owns the task lock represented by a SCHEDULING status. Each application startup should generate a new identifying string for that application instance. In a single-node deployment (which is all that SSP currently supports), if workflow_status = SCHEDULING, but scheduled_by_process is not the application's current identifying string, the task has been abandoned and is eligible for restart. In multi-node deployments, process identifiers can be correlated with a heartbeat table (which does not yet exist) to determine if the owning process has gone awol, in which case the task is also eligible for restart.

Preferred identifier format: <system_id>-<startup-timestamp>. <system_id> can be sourced from ssp-config.properties

execution_started_datetimestampYesMoment at which task exiting the scheduling mechanism and set into an EXECUTING status
workflow_stopped_datetimestampYesMoment at which task transitioned into a COMPLETED, ERROR, or FAILURE state.

Task Queue Workflow

Creating Tasks

A domain-specific API (and they should be domain specific... no generic /ssp/api/1/bulk resource, please) which requires background processing is explicit in that regard. I.e. its request and response are explicitly designed to reflect the bulk and asynchronous nature of the underlying processing.

The POST request payload describes the proposed state change and a specification for targeting entities to be affected. The specification is usually one of:

  1. A list of entity IDs
  2. A query specification describing how to find affected entities
  3. No target specification, e.g. if the operation is intended strictly for entity creation

The successful result of such an API call is a persistent QueuedTask which is represented to the caller by a lightweight JSON struct. The only data the client requires is the QueuedTask's ID. It is probably easiest to just return "core" audit fields. Under no circumstances should the execution_state nor workflow_status_desc be returned.

The diagram below illustrates this process as a sequence diagram. It is mostly rote SSP CRUD behavior with the exception of the QueuedTaskExecutor responsible for serializing the proposed state change into a JSON string. This can be done very simply by instantiating a Jackson ObjectMapper and calling writeValueAsString(). The same QueuedTaskExecutor is expected to be responsible for performing the deserialization operation when the QueuedTask is popped off the queue for processing (shown in later diagrams). The assumption is that QueuedTaskExecutor instances will be Spring managed beans specific to each bulk Domain Svc method.

 

Scheduling Tasks

"Scheduling Tasks" is the process of finding a persistent QueuedTask which is not completed and which is not currently being processed, locking it, and placing it into a local execution queue.

A background job executing every 15s by default will be responsible for scheduling QueuedTasks. In a transaction it will, in the simplest case, attempt to locate at least one QueuedTask where workflow_status=QUEUED, change that field to SCHEDULING, and set the scheduling_started_date and scheduled_by_process.

It will then instantiate a QueuedTaskRunnable, seeding it with the QueuedTask's identifier, and pass the former to a Spring ThreadPoolTaskExecutor backed.

The QueuedTaskRunnable gets the QueuedTask's identifier, not a reference to the task itself, because the task will be executed in a separate thread and thus in a separate Hibernate Session.

The hand off to the async TaskExecutor ensures long-running QueuedTasks do not, within reason, block QueuedTasks behind them. ThreadPoolTaskExecutor resource pools should not be unbounded, though.

More detail on selecting QueuedTasks for scheduling:

  1. Always try to schedule the oldest task first
  2. Look for and try to restart stalled tasks before new tasks, i.e. before checking for workflow_status=QUEUED, look for (( workflow_status=QUEUED || workflow_status=SCHEDULING || workflow_status=EXECUTING ) && scheduled_by_process != currentQueuedTaskSchedulerId())

Let's have a config for max QueuedTasks to schedule per job execution. In any case where the ThreadPoolTaskExecutor queue has been filled, the scheduling job should stop and await its next execution before scheduling any more tasks.

Errors that occur during creation of the QueuedTaskRunnable creation or TaskExecutor.execute() invocation may result in the QueuedTask being set into a ERROR state, though it is hard to imagine how this might occur in practice. Do not treat resource exhaustion exceptions from TaskExecutor.execute() as fatal errors for the task itself. Just leave it in a SCHEDULING state so it can be rescheduled.

Do not attempt to update the QueuedTask state after TaskExecutor.execute() since that work can technically complete before the scheduler's thread can complete its QueuedTask update.

QueuedTaskRunnable is a stateful component and not specific to the domain action targeted by the bulk API request, so will likely not be a Spring-managed bean and will be instantiated directly by QueuedTaskServiceQTR needs to be stateful to support the series of callbacks proposed below in the 'Executing Tasks' section.

 

Executing Tasks

"Executing Tasks" is the process of performing the actual work specified by a task, after the task has been fully processed by the scheduling mechanism described above. This is by far the most complicated step of the QueuedTask workflow.

Task execution involves at a minimum three transactions:

  1. Set workflow_status=EXECUTING and set the execution_started_date timestamp
  2. Perform the work
  3. Set workflow_status to a terminating status and set the workflow_stopped_date timestamp

Each is a separate transaction because step 2 may take arbitrarily long and itself take several transactions. So we cannot bind updates to the QueuedTask's lifecycle to the execution of its actual work, or at least not the first phace in the lifecycle (set workflow_status=EXECUTING). Technically, the second and third phases could be combined, at least for non-error paths, so long as the second phase uses the proposed execWithBatchedTaskContext() (see below). For error paths, though, the third transaction is likely unavoidable since a RuntimeException should be used to roll back the second transaction. Low level transaction management details aside, the layering goal is always to make QueuedTaskRunnable responsible for maintaining workflow_status while the 'Domain Service' is responsible for maintaining execution_state.

The diagram below mistakenly excludes a call to ScheduledTaskWrapperService to set up security and Hibernate contexts for the third transaction. This should work the same way as the first transaction.

 

Since QueuedTasks are executed outside the context of a web request, the "normal" security and database connection context is not available to the task execution workflow. To work around this, we hope to reuse currently protected mechanisms in ScheduledTaskWrapperServiceImpl, represented below as execWithTaskContext() and execWithBatchedTaskContext(). The former already exists with that name, the latter is execBatchedTaskWithName(). Hopefully these methods can just be promoted to public and the only modifications will be to execBatchedTaskWithName() and some of the internals it calls because we need the client to be able to specify the person for whom to initialize a security context, rather than just use the super user.

execBatchedTaskWithName() was created because long running jobs such as our external person sync would effectively create memory leaks when run under a single Hibernate Session. Any of our QueuedTasks which also potentially perform large amounts of work should strongly considering leveraging that same mechanism to chunk up their workloads. QueuedTasks can use the exectution_state column to store state in between batches. The intention is for that field to contain a JSON-serialized HashMap. That column is also an excellent way for QueuedTask to transactionally mark themselves complete and thus immune to accidental re-execution, without requiring that a transaction be held open across all batches.

As noted above, QueuedTaskRunnable will be a stateful component such that it can fire the correct step in the current Task's execution for each of the three transactional callbacks.

The QueuedTaskExecutor and 'Domain Service' should make an attempt to distinguish (via exception type) between permanent and transient errors such that the QueuedTaskRunnable can know whether or not to allow the QueuedTask to be retried or to set it into a permanent FAILURE status. It is up to the 'Domain Service' to ensure that a QueuedTask retry does not either re-execute the same work in an already-committed transaction nor endlessly retry the same work. These safe guards would typically be implemented by manipulating execution_state. (It may be worth considering implementation of strictly bounded retry limit at the QueuedTaskRunnable layer. But this can be deferred for the time being until we understand more about the failure modes of the specific domain operations being implemented.)