Skip to content

webknossos.dataset.view

#   class View:

A View is essentially a bounding box to a region of a specific wkw.Dataset that also provides functionality. Read- and write-operations are restricted to the bounding box. Views are designed to be easily passed around as parameters. A View, in its most basic form, does not have a reference to the webknossos.dataset.dataset.Dataset.

#   View( path_to_mag_view: pathlib.Path, header: wkw.wkw.Header, size: Union[webknossos.geometry.vec3_int.Vec3Int, Tuple[int, int, int], Tuple[int, ...], numpy.ndarray, List[int], Iterable[int]], global_offset: Union[webknossos.geometry.vec3_int.Vec3Int, Tuple[int, int, int], Tuple[int, ...], numpy.ndarray, List[int], Iterable[int]], is_bounded: bool = True, read_only: bool = False, mag_view_bbox_at_creation: Union[webknossos.geometry.bounding_box.BoundingBox, NoneType] = None )

Do not use this constructor manually. Instead use webknossos.dataset.mag_view.MagView.get_view() to get a View.

#   header: wkw.wkw.Header
#   read_only: bool
#   def write( self, data: numpy.ndarray, offset: Union[webknossos.geometry.vec3_int.Vec3Int, Tuple[int, int, int], Tuple[int, ...], numpy.ndarray, List[int], Iterable[int]] = Vec3Int(0,0,0) ) -> None:

Writes the data at the specified offset to disk. The offset is relative to global_offset.

Note that writing compressed data which is not aligned with the blocks on disk may result in diminished performance, as full blocks will automatically be read to pad the write actions.

#   def read( self, offset: Union[webknossos.geometry.vec3_int.Vec3Int, Tuple[int, int, int], Tuple[int, ...], numpy.ndarray, List[int], Iterable[int]] = Vec3Int(0,0,0), size: Union[webknossos.geometry.vec3_int.Vec3Int, Tuple[int, int, int], Tuple[int, ...], numpy.ndarray, List[int], Iterable[int], NoneType] = None ) -> numpy.ndarray:

The user can specify the offset and the size of the requested data. The offset is relative to global_offset. If no size is specified, the size of the view is used. If the specified bounding box exceeds the data on disk, the rest is padded with 0.

Returns the specified data as a np.array.

Example:

import numpy as np

# ...
# let 'mag1' be a `MagView`
view = mag1.get_view(offset(10, 20, 30), size=(100, 200, 300))

assert np.array_equal(
    view.read(offset=(0, 0, 0), size=(100, 200, 300)),
    view.read(),
)

# works because the specified data is completely in the bounding box of the view
some_data = view.read(offset=(50, 60, 70), size=(10, 120, 230))

# fails because the specified data is not completely in the bounding box of the view
more_data = view.read(offset=(50, 60, 70), size=(999, 120, 230))
#   def read_bbox( self, bounding_box: Union[webknossos.geometry.bounding_box.BoundingBox, NoneType] = None ) -> numpy.ndarray:

The user can specify the bounding_box of the requested data. See read() for more details.

#   def get_view( self, offset: Union[webknossos.geometry.vec3_int.Vec3Int, Tuple[int, int, int], Tuple[int, ...], numpy.ndarray, List[int], Iterable[int]] = Vec3Int(0,0,0), size: Union[webknossos.geometry.vec3_int.Vec3Int, Tuple[int, int, int], Tuple[int, ...], numpy.ndarray, List[int], Iterable[int], NoneType] = None, read_only: Union[bool, NoneType] = None ) -> webknossos.dataset.view.View:

Returns a view that is limited to the specified bounding box. The offset is relative to global_offset. If no size is specified, the size of the view is used.

The offset and size may only exceed the bounding box of the current view, if read_only is set to True.

If read_only is True, write operations are not allowed for the returned sub-view.

Example:

# ...
# let 'mag1' be a `MagView`
view = mag1.get_view(offset(10, 20, 30), size=(100, 200, 300))

# works because the specified sub-view is completely in the bounding box of the view
sub_view = view.get_view(offset=(50, 60, 70), size=(10, 120, 230))

# fails because the specified sub-view is not completely in the bounding box of the view
invalid_sub_view = view.get_view(offset=(50, 60, 70), size=(999, 120, 230))

# works because `read_only=True`
invalid_sub_view = view.get_view(offset=(50, 60, 70), size=(999, 120, 230), read_only=True)
#   def get_buffered_slice_writer( self, offset: Union[webknossos.geometry.vec3_int.Vec3Int, Tuple[int, int, int], Tuple[int, ...], numpy.ndarray, List[int], Iterable[int]] = Vec3Int(0,0,0), buffer_size: int = 32, dimension: int = 2 ) -> webknossos.dataset._utils.buffered_slice_writer.BufferedSliceWriter:

The BufferedSliceWriter buffers multiple slices before they are written to disk. The amount of slices that get buffered is specified by buffer_size. As soon as the buffer is full, the data gets written to disk.

The user can specify along which dimension the data is sliced by using the parameter dimension. To slice along the x-axis use 0, for the y-axis use 1, or for the z-axis use 2 (default: dimension=2).

The BufferedSliceWriter must be used as context manager using the with syntax (see example below), which results in a generator consuming np.ndarray-slices via writer.send(slice). Exiting the context will automatically flush any remaining buffered data to disk.

Usage: data_cube = ... view = ... with view.get_buffered_slice_writer() as writer: for data_slice in data_cube: writer.send(data_slice)

#   def get_buffered_slice_reader( self, offset: Union[webknossos.geometry.vec3_int.Vec3Int, Tuple[int, int, int], Tuple[int, ...], numpy.ndarray, List[int], Iterable[int]] = Vec3Int(0,0,0), size: Union[webknossos.geometry.vec3_int.Vec3Int, Tuple[int, int, int], Tuple[int, ...], numpy.ndarray, List[int], Iterable[int], NoneType] = None, buffer_size: int = 32, dimension: int = 2 ) -> webknossos.dataset._utils.buffered_slice_reader.BufferedSliceReader:

The BufferedSliceReader yields slices of data along a specified axis. Internally, it reads multiple slices from disk at once and buffers the data. The amount of slices that get buffered is specified by buffer_size.

The user can specify along which dimension the data is sliced by using the parameter dimension. To slice along the x-axis use 0, for the y-axis use 1, or for the z-axis use 2 (default: dimension=2).

The BufferedSliceReader must be used as a context manager using the with syntax (see example below). Entering the context returns a generator with yields slices (np.ndarray).

Usage: view = ... with view.get_buffered_slice_reader() as reader: for slice_data in reader: ...

#   def for_each_chunk( self, work_on_chunk: collections.abc.Callable[tuple[webknossos.dataset.view.View, int], NoneType], chunk_size: Union[webknossos.geometry.vec3_int.Vec3Int, Tuple[int, int, int], Tuple[int, ...], numpy.ndarray, List[int], Iterable[int]], executor: Union[cluster_tools.schedulers.cluster_executor.ClusterExecutor, cluster_tools.WrappedProcessPoolExecutor, NoneType] = None ) -> None:

The view is chunked into multiple sub-views of size chunk_size. Then, work_on_chunk is performed on each sub-view. Besides the view, the counter 'i' is passed to the 'work_on_chunk', which can be used for logging. Additional parameter for 'work_on_chunk' can be specified. The computation of each chunk has to be independent of each other. Therefore, the work can be parallelized with executor.

If the View is of type MagView, only the bounding box from the properties is chunked.

Example:

from webknossos.utils import get_executor_for_args, named_partial

def some_work(args: Tuple[View, int], some_parameter: int) -> None:
    view_of_single_chunk, i = args
    # perform operations on the view
    ...

# ...
# let 'mag1' be a `MagView`
view = mag1.get_view()
func = named_partial(some_work, some_parameter=42)
view.for_each_chunk(
    func,
    chunk_size=(100, 100, 100),  # Use mag1._get_file_dimensions() if the size of the chunks should match the size of the files on disk
)
#   def for_zipped_chunks( self, work_on_chunk: collections.abc.Callable[tuple[webknossos.dataset.view.View, webknossos.dataset.view.View, int], NoneType], target_view: webknossos.dataset.view.View, source_chunk_size: Union[webknossos.geometry.vec3_int.Vec3Int, Tuple[int, int, int], Tuple[int, ...], numpy.ndarray, List[int], Iterable[int]], target_chunk_size: Union[webknossos.geometry.vec3_int.Vec3Int, Tuple[int, int, int], Tuple[int, ...], numpy.ndarray, List[int], Iterable[int]], executor: Union[cluster_tools.schedulers.cluster_executor.ClusterExecutor, cluster_tools.WrappedProcessPoolExecutor, NoneType] = None ) -> None:

This method is similar to 'for_each_chunk' in the sense, that it delegates work to smaller chunks. However, this method also takes another view as a parameter. Both views are chunked simultaneously and a matching pair of chunks is then passed to the function that shall be executed. This is useful if data from one view should be (transformed and) written to a different view, assuming that the transformation of the data can be handled on chunk-level. Additionally to the two views, the counter 'i' is passed to the 'work_on_chunk', which can be used for logging. The mapping of chunks from the source view to the target is bijective. The ratio between the size of the source_view (self) and the source_chunk_size must be equal to the ratio between the target_view and the target_chunk_size. This guarantees that the number of chunks in the source_view is equal to the number of chunks in the target_view.

Example use case: downsampling:

  • size of source_view (Mag 1): (16384, 16384, 16384)
  • size of target_view (Mag 2): (8192, 8192, 8192)
  • source_chunk_size: (2048, 2048, 2048)
  • target_chunk_size: (1024, 1024, 1024) // this must be a multiple of the file size on disk to avoid concurrent writes
#   def get_dtype(self) -> type:

Returns the dtype per channel of the data. For example uint8.

Back to top