Add shared-memory arena framework for iterate_in_subprocess (#1520)#1520
Merged
Conversation
Contributor
|
@moto-meta has exported this pull request. If you are a Meta employee, you can view the originating Diff in D107709116. |
f30336f to
30ebad3
Compare
Summary: Adds the backend-agnostic machinery for moving large binary payloads (large `bytes`, NumPy arrays, Torch tensors) between the worker and parent of `iterate_in_subprocess` through shared memory instead of pickling them through the result queue. This commit contains only the framework; concrete buffer backends are added in following commits. New private subpackage `spdl/pipeline/_arena/`: - `_protocol.py`: structural `ArenaProtocol` / `ArenaWriterProtocol` / `ArenaReaderProtocol` interfaces. `iterate_in_subprocess` accepts any object satisfying `ArenaProtocol` through one `buffer` argument and dispatches on its type, so additional backends need no signature change. A unit (one pipeline item, possibly several binaries) is bracketed by `begin_unit` / `commit_unit` on the writer and returned with `end_unit` on the reader, so reservation and return happen in bulk per unit. - `_marker.py`: `_ShmMarker`, the small descriptor that replaces an offloaded binary in the pickled envelope. - `_registry.py`: decides which objects are offloaded and how. A handler exposes an object's bytes as a zero-copy source buffer (`get_buffer`) so the arena copies into shared memory exactly once, and rebuilds it from a destination buffer (`from_buffer`), returning the restored object plus a lifetime anchor for zero-copy views. The `bytes` handler is pure Python; NumPy and Torch are reached through `lazy_import` so `import spdl.pipeline` works without them and they never become hard dependencies. - `_offload.py`: `offload` / `restore`, which divert offloadable leaves of an arbitrarily nested object into the arena via a pickler `persistent_id` / `persistent_load` pair while the rest is pickled normally. The envelope carries an 8-byte unit span so the reader can return the whole region in one call. `iterate_in_subprocess` gains a `buffer: ArenaProtocol | None` argument. When provided, the worker offloads each item before putting it on the queue and the parent restores it as the first step; the arena is created and owned by the parent, passed to the worker, and closed and unlinked at teardown after the worker is confirmed dead. With `buffer=None` the behavior is unchanged. Reviewed By: huangruizhe Differential Revision: D107709116
30ebad3 to
06a515a
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary:
Adds the backend-agnostic machinery for moving large binary payloads (large
bytes, NumPy arrays, Torch tensors) between the worker and parent ofiterate_in_subprocessthrough shared memory instead of pickling them through the result queue. This commit contains only the framework; concrete buffer backends are added in following commits.New private subpackage
spdl/pipeline/_arena/:_protocol.py: structuralArenaProtocol/ArenaWriterProtocol/ArenaReaderProtocolinterfaces.iterate_in_subprocessaccepts any object satisfyingArenaProtocolthrough onebufferargument and dispatches on its type, so additional backends need no signature change. A unit (one pipeline item, possibly several binaries) is bracketed bybegin_unit/commit_uniton the writer and returned withend_uniton the reader, so reservation and return happen in bulk per unit._marker.py:_ShmMarker, the small descriptor that replaces an offloaded binary in the pickled envelope._registry.py: decides which objects are offloaded and how. A handler exposes an object's bytes as a zero-copy source buffer (get_buffer) so the arena copies into shared memory exactly once, and rebuilds it from a destination buffer (from_buffer), returning the restored object plus a lifetime anchor for zero-copy views. Thebyteshandler is pure Python; NumPy and Torch are reached throughlazy_importsoimport spdl.pipelineworks without them and they never become hard dependencies._offload.py:offload/restore, which divert offloadable leaves of an arbitrarily nested object into the arena via a picklerpersistent_id/persistent_loadpair while the rest is pickled normally. The envelope carries an 8-byte unit span so the reader can return the whole region in one call.iterate_in_subprocessgains abuffer: ArenaProtocol | Noneargument. When provided, the worker offloads each item before putting it on the queue and the parent restores it as the first step; the arena is created and owned by the parent, passed to the worker, and closed and unlinked at teardown after the worker is confirmed dead. Withbuffer=Nonethe behavior is unchanged.Reviewed By: huangruizhe
Differential Revision: D107709116