SSP Technical Design: Bulk Actions
Overview
"Actionable Search Result/Caseload" features, e.g. bulk program status change, bulk early alert submission, etc, require asynchronous processing since users can easily submit "actions" which we cannot guarantee will complete within a reasonable time frame for the HTTP request/response cycle. SSP needs a persistent task queue to support this asynchronous processing.
For SSP 2.6.0, the first bulk actions to be supported are:
- Export Search/Caseload
- Program Status Change
- Watch/Unwatch
- Send Email to Search/Caseload
Export Search/Caseload should not be any more heavy-weight than the original read operation that resulted in the exportable dataset, but may need to be arbitrarily bounded to prevent excessively large downloads (we know some external_person
tables have ~1million rows). Watch/Unwatch should, in theory, not be any more heavy-weight than the bulk caseload reassignment functionality we already support. But again, the result sets being manipulated are potentially much larger, so may need to bounded. But even if those two operations can be bounded in a way that they do not require asynchronous processing, Program Status Change and Send Email to Search/Caseload are heavy-weight enough that they do likely require a task queue, even for run-of-the-mill batch sizes. And we know that batch submission of Early Alerts, which is even more heavy-weight, is likely the next bulk action to be implemented. So building a generic task queue in the 2.6.0 time frame is justifiable.
The Task Queue
The Task Queue will consist of:
- Database table - For storing the queue itself
- Task queue service/DAO - Standard app-layer technical infrastructure for encapsulating the database table
- Task scheduler - Background job for popping tasks off the queue
- Task executor - For dispatching the work asynchronously w/r/t the scheduled job. Prevents long-running jobs from clogging the queue.
- APIs - Instead of operating on entities one-by-one, bulk operation requests will payloads specifying both the proposed state change and either a list of entities to be modified or a query specification for deferred calculation of such lists. Responses will be pointers to the created task queue entry. An API for browsing the queue itself is likely out of scope at this time.
Task Queue Persistence
Column | Type | Nullable | Description |
---|---|---|---|
id | UUID | No | Standard PK field |
created_date | timestamp | No | Standard audit field |
created_by | UUID | No | Standard audit field. FK to person.id |
modified_date | timestamp | No | Standard audit field |
modified_by | UUID | No | Standard audit field. FK to person.id |
owner_id | UUID | No | Actual end user requesting the work. Similar to map_plan.owner_id . FK to person.id |
run_as_id | UUID | No | The person as whom the work should execute. Typically set to owner_id except in cases where the job must run with elevated, system-level permissions. FK to person.id |
execution_component_name | varchar | No | Spring bean name to which work execution should delegate. Bean must implement QueuedTaskExecutor . See below |
execution_spec | text | Yes | JSON-encoded Java Pojo representing the originally proposed unit of work. E.g. for bulk mail send, a JSON struct containing the requested subject, body, CC addrs, and list of targeted person IDs. Typically deserialized as part of QueuedTaskExecutor.execute() internals. |
execution_state | text | No | Typically JSON-encoded state of task-execution-in-progress. Long-running tasks may choose to batch their operations into multiple transactions. This allows such work to bookmark progress such that restarts do not result in duplicate operations, and without modifying the original execution_spec . Default to '{}', i.e. the empty map. |
workflow_status | varchar | No | Enum:
|
workflow_status_desc | text | Yes | Free-form text adding detail to workflow_status . Usually only set if status is ERROR or FAILURE , in which case it is usually set to a stack trace or error code. Error code preferred (similar to LTI). |
scheduling_started_date | timestamp | Yes | Moment at which task was popped off the queue and was set to a SCHEDULING status. |
scheduled_by_process | varchar | Yes | Opaque string which identifies the process which owns the task lock represented by a Preferred identifier format: |
execution_started_date | timestamp | Yes | Moment at which task exiting the scheduling mechanism and set into an EXECUTING status |
workflow_stopped_date | timestamp | Yes | Moment at which task transitioned into a COMPLETED , ERROR , or FAILURE state. |
Task Queue Workflow
Creating Tasks
A domain-specific API (and they should be domain specific... no generic /ssp/api/1/bulk
resource, please) which requires background processing is explicit in that regard. I.e. its request and response are explicitly designed to reflect the bulk and asynchronous nature of the underlying processing.
The POST request payload describes the proposed state change and a specification for targeting entities to be affected. The specification is usually one of:
- A list of entity IDs
- A query specification describing how to find affected entities
- No target specification, e.g. if the operation is intended strictly for entity creation
The successful result of such an API call is a persistent QueuedTask
which is represented to the caller by a lightweight JSON struct. The only data the client requires is the QueuedTask's
ID. It is probably easiest to just return "core" audit fields. Under no circumstances should the execution_state
nor workflow_status_desc
be returned.
The diagram below illustrates this process as a sequence diagram. It is mostly rote SSP CRUD behavior with the exception of the QueuedTaskExecutor
responsible for serializing the proposed state change into a JSON string. This can be done very simply by instantiating a Jackson ObjectMapper
and calling writeValueAsString()
. The same QueuedTaskExecutor
is expected to be responsible for performing the deserialization operation when the QueuedTask
is popped off the queue for processing (shown in later diagrams). The assumption is that QueuedTaskExecutor
instances will be Spring managed beans specific to each bulk Domain Svc method.
Scheduling Tasks
"Scheduling Tasks" is the process of finding a persistent QueuedTask
which is not completed and which is not currently being processed, locking it, and placing it into a local execution queue.
A background job executing every 15s by default will be responsible for scheduling QueuedTasks
. In a transaction it will, in the simplest case, attempt to locate at least one QueuedTask
where workflow_status=QUEUED
, change that field to SCHEDULING
, and set the scheduling_started_date
and scheduled_by_process.
It will then instantiate a QueuedTaskRunnable
, seeding it with the QueuedTask
's identifier, and pass the former to a Spring ThreadPoolTaskExecutor
backed.
The QueuedTaskRunnable
gets the QueuedTask's
identifier, not a reference to the task itself, because the task will be executed in a separate thread and thus in a separate Hibernate Session
.
The hand off to the async TaskExecutor
ensures long-running QueuedTasks
do not, within reason, block QueuedTasks
behind them. ThreadPoolTaskExecutor
resource pools should not be unbounded, though.
More detail on selecting QueuedTasks
for scheduling:
- Always try to schedule the oldest task first
- Look for and try to restart stalled tasks before new tasks, i.e. before checking for
workflow_status=QUEUED
, look for(( workflow_status=QUEUED || workflow_status=SCHEDULING || workflow_status=EXECUTING ) && scheduled_by_process != currentQueuedTaskSchedulerId())
Let's have a config for max QueuedTasks
to schedule per job execution. In any case where the ThreadPoolTaskExecutor
queue has been filled, the scheduling job should stop and await its next execution before scheduling any more tasks.
Errors that occur during creation of the QueuedTaskRunnable
creation or TaskExecutor.execute()
invocation may result in the QueuedTask
being set into a ERROR
state, though it is hard to imagine how this might occur in practice. Do not treat resource exhaustion exceptions from TaskExecutor.execute()
as fatal errors for the task itself. Just leave it in a SCHEDULING
state so it can be rescheduled.
Do not attempt to update the QueuedTask
state after TaskExecutor.execute()
since that work can technically complete before the scheduler's thread can complete its QueuedTask
update.
QueuedTaskRunnable
is a stateful component and not specific to the domain action targeted by the bulk API request, so will likely not be a Spring-managed bean and will be instantiated directly by QueuedTaskService
. QTR
needs to be stateful to support the series of callbacks proposed below in the 'Executing Tasks' section.
Executing Tasks
"Executing Tasks" is the process of performing the actual work specified by a task, after the task has been fully processed by the scheduling mechanism described above. This is by far the most complicated step of the QueuedTask
workflow.
Task execution involves at a minimum three transactions:
- Set
workflow_status=EXECUTING
and set theexecution_started_date
timestamp - Perform the work
- Set
workflow_status
to a terminating status and set theworkflow_stopped_date
timestamp
Each is a separate transaction because step 2 may take arbitrarily long and itself take several transactions. So we cannot bind updates to the QueuedTask
's lifecycle to the execution of its actual work, or at least not the first phace in the lifecycle (set workflow_status=EXECUTING)
. Technically, the second and third phases could be combined, at least for non-error paths, so long as the second phase uses the proposed execWithBatchedTaskContext()
(see below). For error paths, though, the third transaction is likely unavoidable since a RuntimeException
should be used to roll back the second transaction. Low level transaction management details aside, the layering goal is always to make QueuedTaskRunnable
responsible for maintaining workflow_status
while the 'Domain Service' is responsible for maintaining execution_state
.
ScheduledTaskWrapperService
to set up security and Hibernate contexts for the third transaction. This should work the same way as the first transaction.
Since QueuedTasks
are executed outside the context of a web request, the "normal" security and database connection context is not available to the task execution workflow. To work around this, we hope to reuse currently protected
mechanisms in ScheduledTaskWrapperServiceImpl
, represented below as execWithTaskContext()
and execWithBatchedTaskContext()
. The former already exists with that name, the latter is execBatchedTaskWithName()
. Hopefully these methods can just be promoted to public and the only modifications will be to execBatchedTaskWithName()
and some of the internals it calls because we need the client to be able to specify the person
for whom to initialize a security context, rather than just use the super user.
execBatchedTaskWithName()
was created because long running jobs such as our external person sync would effectively create memory leaks when run under a single Hibernate Session
. Any of our QueuedTasks
which also potentially perform large amounts of work should strongly considering leveraging that same mechanism to chunk up their workloads. QueuedTasks
can use the exectution_state
column to store state in between batches. The intention is for that field to contain a JSON-serialized HashMap
. That column is also an excellent way for QueuedTask
to transactionally mark themselves complete and thus immune to accidental re-execution, without requiring that a transaction be held open across all batches.
As noted above, QueuedTaskRunnable
will be a stateful component such that it can fire the correct step in the current Task's execution for each of the three transactional callbacks.
The QueuedTaskExecutor
and 'Domain Service' should make an attempt to distinguish (via exception type) between permanent and transient errors such that the QueuedTaskRunnable
can know whether or not to allow the QueuedTask
to be retried or to set it into a permanent FAILURE
status. It is up to the 'Domain Service' to ensure that a QueuedTask
retry does not either re-execute the same work in an already-committed transaction nor endlessly retry the same work. These safe guards would typically be implemented by manipulating execution_state
. (It may be worth considering implementation of strictly bounded retry limit at the QueuedTaskRunnable
layer. But this can be deferred for the time being until we understand more about the failure modes of the specific domain operations being implemented.)