Skip to content

add unit test coverage for WorkerBatchInternalQueue #5552

@aglinxinyuan

Description

@aglinxinyuan

Background

WorkerBatchInternalQueue in engine/architecture/pythonworker currently lacks a dedicated unit-spec. It is a trait providing the per-DP-thread mailbox used by the Python worker, with two-priority queues (data + control) and per-channel in-flight byte accounting via getQueuedCredit.

Behavior to pin

Surface Contract
enqueueData + getElement round-trips a DataElement
enqueueCommand + getElement round-trips a ControlElement
enqueueActorCommand + getElement round-trips an ActorCommandElement
Multi-priority dispatch control elements are returned before data elements when both are queued (queue priorities 0 < 1)
getDataQueueLength reports the count of data-queue items (excludes control)
getControlQueueLength / isControlQueueEmpty reports the count of control-queue items
disableDataQueue stops getElement from picking from the data queue (control items still flow)
enableDataQueue re-enables the data queue
getQueuedCredit(sender) equals 0 when nothing has been enqueued
getQueuedCredit after enqueueData(DataFrame) tracks bytes IN minus bytes OUT for that sender
getQueuedCredit after getElement consumes the data returns to 0 (in == out)
getQueuedCredit for non-DataFrame payloads does not increment in/out byte counters
getQueuedCredit across distinct senders keeps each sender's accounting independent

Scope

  • New spec file: WorkerBatchInternalQueueSpec.scala (matches the <srcClassName>Spec.scala convention).
  • The trait is instantiated through a small test-only subclass.
  • No production-code changes.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No fields configured for Task.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions