876 lines
36 KiB
Cython
876 lines
36 KiB
Cython
# distutils: language=c++
|
|
# cython: embedsignature=True, language_level=3
|
|
|
|
###############################################################################/
|
|
# Copyright (c) 2021 Nerian Vision GmbH
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
# of this software and associated documentation files (the "Software"), to deal
|
|
# in the Software without restriction, including without limitation the rights
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
# furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included in
|
|
# all copies or substantial portions of the Software.
|
|
###############################################################################/
|
|
|
|
'''
|
|
Python 3 wrapper for libvisiontransfer by Nerian Vision
|
|
|
|
This module is a wrapper for the libvisiontransfer library,
|
|
used to control and acquire data from Nerian's line of stereo
|
|
vision devices.
|
|
|
|
This documentation is largely autogenerated from the
|
|
C++ library doxygen annotations:
|
|
|
|
Please note that in some instances, the actual functions have been
|
|
adapted to be more Pythonic from their C++-specific calling conventions.
|
|
In particular, the auto-generated documentation of parameter getter
|
|
functions may indicate a number of arguments (C++ reference arguments),
|
|
but they actually directly return tuples in this Python library.
|
|
Refer to their Cython signature line (first line of their docstring)
|
|
to see the true arguments you can use; the rest of the arguments in
|
|
the C++ argument list is instead returned as a result tuple.
|
|
|
|
=============================================================================
|
|
|
|
Copyright (c) 2021 Nerian Vision GmbH
|
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
of this software and associated documentation files (the "Software"), to deal
|
|
in the Software without restriction, including without limitation the rights
|
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
copies of the Software, and to permit persons to whom the Software is
|
|
furnished to do so, subject to the following conditions:
|
|
|
|
The above copyright notice and this permission notice shall be included in
|
|
all copies or substantial portions of the Software.
|
|
|
|
'''
|
|
|
|
cimport visiontransfer_cpp as cpp
|
|
|
|
# Autogenerated parameter access in extra file
|
|
include "visiontransfer/visiontransfer_parameters_autogen.pyx"
|
|
|
|
from libcpp.string cimport string
|
|
from libcpp.vector cimport vector
|
|
from libcpp cimport bool
|
|
from cython cimport view
|
|
|
|
cimport numpy as np
|
|
import numpy as np
|
|
np.import_array()
|
|
|
|
import enum
|
|
import sys
|
|
import time
|
|
|
|
class AutoMode(enum.IntEnum):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceParameters::AutoMode")
|
|
AUTO_EXPOSURE_AND_GAIN = 0
|
|
AUTO_EXPOSURE_MANUAL_GAIN = 1
|
|
MANUAL_EXPOSURE_AUTO_GAIN = 2
|
|
MANUAL_EXPOSURE_MANUAL_GAIN = 3
|
|
|
|
class DeviceModel(enum.IntEnum):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceParameters::DeviceModel")
|
|
SCENESCAN = 0
|
|
SCENESCAN_PRO = 1
|
|
|
|
class NetworkProtocol(enum.IntEnum):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceInfo::NetworkProtocol")
|
|
PROTOCOL_TCP = 0
|
|
PROTOCOL_UDP = 1
|
|
|
|
class ProtocolType(enum.IntEnum):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageProtocol::ProtocolType")
|
|
PROTOCOL_TCP = 0
|
|
PROTOCOL_UDP = 1
|
|
|
|
class ImageFormat(enum.IntEnum):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::ImageFormat")
|
|
FORMAT_8_BIT_MONO = 0
|
|
FORMAT_8_BIT_RGB = 1
|
|
FORMAT_12_BIT_MONO = 2
|
|
|
|
class ImageType(enum.IntEnum):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::ImageType")
|
|
IMAGE_UNDEFINED = 0
|
|
IMAGE_LEFT = 1
|
|
IMAGE_DISPARITY = 2
|
|
IMAGE_RIGHT = 3
|
|
|
|
class OperationMode(enum.IntEnum):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceParameters::OperationMode")
|
|
PASS_THROUGH = 0
|
|
RECTIFY = 1
|
|
STEREO_MATCHING = 2
|
|
|
|
class TargetFrame(enum.IntEnum):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceParameters::TargetFrame")
|
|
LEFT_FRAME = 0
|
|
RIGHT_FRAME = 1
|
|
BOTH_FRAMES = 2
|
|
|
|
cdef class DeviceEnumeration:
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceEnumeration")
|
|
|
|
cdef cpp.DeviceEnumeration c_obj
|
|
|
|
def discover_devices(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceEnumeration::discoverDevices")
|
|
device_infos = []
|
|
cdef vector[cpp.DeviceInfo] devices = self.c_obj.discoverDevices()
|
|
for device in devices:
|
|
di = DeviceInfo()
|
|
di.c_obj = device
|
|
device_infos.append(di)
|
|
return device_infos
|
|
|
|
cdef class DeviceInfo:
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceInfo")
|
|
cdef cpp.DeviceInfo c_obj
|
|
|
|
def get_ip_address(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceInfo::getIpAddress")
|
|
cdef string s = self.c_obj.getIpAddress()
|
|
return s.decode("utf-8")
|
|
|
|
def get_network_protocol(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceInfo::getNetworkProtocol")
|
|
return NetworkProtocol(self.c_obj.getNetworkProtocol())
|
|
|
|
def get_firmware_version(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceInfo::getFirmwareVersion")
|
|
cdef string s = self.c_obj.getFirmwareVersion()
|
|
return s.decode("utf-8")
|
|
|
|
def get_model(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceInfo::getModel")
|
|
return DeviceModel(self.c_obj.getModel())
|
|
|
|
def get_status(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceInfo::getStatus")
|
|
ds = DeviceStatus()
|
|
ds.c_obj = self.c_obj.getStatus()
|
|
return ds
|
|
|
|
def is_compatible(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceInfo::isCompatible")
|
|
return self.c_obj.isCompatible()
|
|
|
|
def __str__(self):
|
|
cdef string s = self.c_obj.toString()
|
|
return s.decode("utf-8")
|
|
|
|
__repr__ = __str__
|
|
|
|
|
|
cdef class DeviceStatus:
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceStatus")
|
|
cdef cpp.DeviceStatus c_obj
|
|
|
|
def is_valid(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceStatus::isValid")
|
|
return self.c_obj.isValid()
|
|
|
|
def get_last_fps(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceStatus::getLastFps")
|
|
return self.c_obj.getLastFps()
|
|
|
|
def get_jumbo_mtu(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceStatus::getJumboMtu")
|
|
return self.c_obj.getJumboMtu()
|
|
|
|
def get_jumbo_frames_enabled(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceStatus::getJumboFramesEnabled")
|
|
# return as bool here (still uint in API)
|
|
return self.c_obj.getJumboFramesEnabled() != 0
|
|
|
|
def get_current_capture_source(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceStatus::getCurrentCaptureSource")
|
|
cdef string s = self.c_obj.getCurrentCaptureSource()
|
|
return s.decode("utf-8")
|
|
|
|
def create_image_set_from_reduced_data(width, height, nimg, indices, strides, formats, data, qmat, seqnum, subpix, expos, disprange, times, lastsync):
|
|
'''Only for internal use (shim for unpickling / copy).'''
|
|
imset = ImageSet()
|
|
imset.set_width(width)
|
|
imset.set_height(height)
|
|
imset.set_number_of_images(nimg)
|
|
for i, what in enumerate([ImageType.IMAGE_LEFT, ImageType.IMAGE_DISPARITY, ImageType.IMAGE_RIGHT]):
|
|
imset.set_index_of(what, indices[i])
|
|
for i in range(nimg):
|
|
imset.set_row_stride(i, strides[i])
|
|
imset.set_pixel_format(i, formats[i])
|
|
imset.set_pixel_data(i, data[i])
|
|
imset.set_qmatrix(qmat)
|
|
imset.set_sequence_number(seqnum)
|
|
imset.set_subpixel_factor(subpix)
|
|
imset.set_exposure_time(expos)
|
|
a, b = disprange
|
|
imset.set_disparity_range(a, b)
|
|
a, b = times
|
|
imset.set_timestamp(a, b)
|
|
a, b = lastsync
|
|
imset.set_last_sync_pulse(a, b)
|
|
return imset
|
|
|
|
cdef class ImageSet:
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet")
|
|
cdef cpp.ImageSet c_obj
|
|
|
|
cdef np.ndarray _numpy_q
|
|
cdef list _numpy_pixels
|
|
cdef bool _touched_internally
|
|
|
|
def __cinit__(self):
|
|
# These members are just here to keep alive the refcounted
|
|
# data references for ImageSets created on the Python side
|
|
# (e.g. unpickled data) -- bear in mind the C++ API purely
|
|
# operates on unmanaged raw data pointers into numpy arrays.
|
|
self._numpy_q = None
|
|
self._numpy_pixels = [None]*3 # MAX_SUPPORTED_IMAGES
|
|
# Successfully setting pixel data from Python flags this
|
|
# object, whitelisting later overwriting (which is prevented
|
|
# a priori for all C++-/ImageTransfer-managed objects).
|
|
self._touched_internally = False
|
|
|
|
def __reduce__(self):
|
|
nimg = self.get_number_of_images()
|
|
return (create_image_set_from_reduced_data, (
|
|
self.get_width(),
|
|
self.get_height(),
|
|
nimg,
|
|
[self.get_index_of(i) for i in [ImageType.IMAGE_LEFT, ImageType.IMAGE_DISPARITY, ImageType.IMAGE_RIGHT]],
|
|
[self.get_row_stride(i) for i in range(nimg)],
|
|
[self.get_pixel_format(i) for i in range(nimg)],
|
|
[self.get_pixel_data_raw(i) for i in range(nimg)],
|
|
self.get_qmatrix(),
|
|
self.get_sequence_number(),
|
|
self.get_subpixel_factor(),
|
|
self.get_exposure_time(),
|
|
self.get_disparity_range(),
|
|
self.get_timestamp(),
|
|
self.get_last_sync_pulse(),
|
|
))
|
|
|
|
def __str__(self):
|
|
w = self.get_width()
|
|
h = self.get_height()
|
|
return f"ImageSet({w}, {h})"
|
|
|
|
__repr__ = __str__
|
|
|
|
def copy(self):
|
|
'''
|
|
Create a full copy of the ImageSet. All its data is managed by Python (i.e.
|
|
no deallocation attempts by the C++ API will ever take place on this clone).
|
|
'''
|
|
cloned = ImageSet()
|
|
nimg = self.get_number_of_images()
|
|
cloned.set_height(self.get_height())
|
|
cloned.set_width(self.get_width())
|
|
cloned.set_number_of_images(nimg)
|
|
for i in [ImageType.IMAGE_LEFT, ImageType.IMAGE_DISPARITY, ImageType.IMAGE_RIGHT]:
|
|
cloned.set_index_of(i, self.get_index_of(i))
|
|
for i in range(nimg):
|
|
cloned.set_row_stride(i, self.get_row_stride(i))
|
|
cloned.set_pixel_format(i, self.get_pixel_format(i))
|
|
sz = cloned.get_height() * cloned.get_row_stride(i)
|
|
cloned.set_pixel_data(i, self.get_pixel_data_raw(i).copy())
|
|
# this also sets _touched_internally -> data is replaceable
|
|
cloned.set_qmatrix(self.get_qmatrix())
|
|
cloned.set_sequence_number(self.get_sequence_number())
|
|
cloned.set_subpixel_factor(self.get_subpixel_factor())
|
|
cloned.set_exposure_time(self.get_exposure_time())
|
|
a, b = self.get_disparity_range()
|
|
cloned.set_disparity_range(a, b)
|
|
a, b = self.get_timestamp()
|
|
cloned.set_timestamp(a, b)
|
|
a, b = self.get_last_sync_pulse()
|
|
cloned.set_last_sync_pulse(a, b)
|
|
return cloned
|
|
|
|
def get_width(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getWidth")
|
|
return self.c_obj.getWidth()
|
|
|
|
def get_height(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getHeight")
|
|
return self.c_obj.getHeight()
|
|
|
|
def get_row_stride(self, image_number):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getRowStride")
|
|
return self.c_obj.getRowStride(image_number)
|
|
|
|
def get_pixel_format(self, what):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getPixelFormat")
|
|
image_number = self.get_index_of(what, True) if isinstance(what, ImageType) else int(what)
|
|
return ImageFormat(self.c_obj.getPixelFormat(<int> image_number))
|
|
|
|
def get_pixel_data_raw(self, what):
|
|
'''Return a flat uint8 view of the image data of the specified channel (primarily for internal use).'''
|
|
image_number = self.get_index_of(what, True) if isinstance(what, ImageType) else int(what)
|
|
cdef int rowstride = self.c_obj.getRowStride(image_number)
|
|
cdef int h = self.c_obj.getHeight()
|
|
cdef int size = rowstride * h
|
|
np_array = self._pixel_data_as_char_array(image_number, size)
|
|
return np_array
|
|
|
|
def get_pixel_data(self, what, force8bit=False, do_copy=True):
|
|
'''
|
|
Obtain a numpy array containing the image data for a channel.
|
|
Args:
|
|
what: The ImageType or image index to retrieve.
|
|
force8bit: optional flag, causes rescaling to 0..255 in case of 12-bit images (dividing by 16).
|
|
do_copy: copy the final array view (default True; primarily for internal use, disable with caution)
|
|
|
|
Returns:
|
|
The image data as a copied numpy array; two-dimensional for monochrome images, three-dimensional for RGB.
|
|
'''
|
|
|
|
image_number = self.get_index_of(what, True) if isinstance(what, ImageType) else int(what)
|
|
|
|
cdef int rowstride = self.c_obj.getRowStride(image_number)
|
|
cdef int w = self.c_obj.getWidth()
|
|
cdef int h = self.c_obj.getHeight()
|
|
cdef int size
|
|
|
|
fmt = self.get_pixel_format(image_number)
|
|
if fmt == ImageFormat.FORMAT_12_BIT_MONO:
|
|
size = (rowstride * h) // 2
|
|
np_array = self._pixel_data_as_short_array(image_number, size)
|
|
np_array = np_array.reshape(h, rowstride//2)
|
|
np_array = np_array[:, :w]
|
|
if force8bit:
|
|
return (np_array // 16).astype(np.uint8) # implicit copy
|
|
else:
|
|
return np_array.copy() if do_copy else np_array
|
|
elif fmt == ImageFormat.FORMAT_8_BIT_RGB:
|
|
size = rowstride * h
|
|
np_array = self._pixel_data_as_char_array(image_number, size)
|
|
np_array = np_array.reshape(h, rowstride//3, 3)
|
|
np_array = np_array[:, :w, :]
|
|
return np_array.copy() if do_copy else np_array
|
|
elif fmt == ImageFormat.FORMAT_8_BIT_MONO:
|
|
size = rowstride * h
|
|
np_array = self._pixel_data_as_char_array(image_number, size)
|
|
np_array = np_array.reshape(h, rowstride)
|
|
np_array = np_array[:, :w]
|
|
return np_array.copy() if do_copy else np_array
|
|
|
|
cdef _pixel_data_as_short_array(self, int image_number, int size):
|
|
cdef unsigned char* pointer = self.c_obj.getPixelData(image_number)
|
|
cdef np.uint16_t* short_prt = <np.uint16_t *> pointer
|
|
cdef np.uint16_t[:] myview = <np.uint16_t[:size]> short_prt
|
|
return np.asarray(myview)
|
|
|
|
cdef _pixel_data_as_char_array(self, int image_number, int size):
|
|
cdef unsigned char* pointer = self.c_obj.getPixelData(image_number)
|
|
cdef np.uint8_t[:] char_view = <np.uint8_t[:size]> pointer
|
|
return np.asarray(char_view)
|
|
|
|
def get_qmatrix(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getQMatrix")
|
|
cdef view.array ar = view.array(shape=(16, ), itemsize=sizeof(float), format="f", mode="c", allocate_buffer=False)
|
|
cdef const float* pointer = self.c_obj.getQMatrix()
|
|
ar.data = <char*> pointer
|
|
np_array = np.asarray(ar)
|
|
np_array = np_array.reshape(4, 4)
|
|
return np_array
|
|
|
|
def get_sequence_number(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getSequenceNumber")
|
|
return self.c_obj.getSequenceNumber()
|
|
|
|
def get_timestamp(self):
|
|
'''
|
|
Returns the time at which this image set has been captured.
|
|
|
|
Returns:
|
|
sec, usec: A tuple representing the time stamp: the integer seconds, and the
|
|
fractional seconds part in microseconds.
|
|
'''
|
|
cdef int sec = 0
|
|
cdef int usec = 0
|
|
self.c_obj.getTimestamp(sec, usec)
|
|
return sec, usec
|
|
|
|
def get_disparity_range(self):
|
|
'''
|
|
Gets the value range for the disparity map contained in this
|
|
image set. If the image set does not contain any disparity data
|
|
then the disparity range is undefined.
|
|
|
|
Returns:
|
|
minimum, maximum: The minimum and maximum disparity in the image set.
|
|
'''
|
|
cdef int minimum = 0
|
|
cdef int maximum = 0
|
|
self.c_obj.getDisparityRange(minimum, maximum)
|
|
return minimum, maximum
|
|
|
|
def get_subpixel_factor(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getSubpixelFactor")
|
|
return self.c_obj.getSubpixelFactor()
|
|
|
|
def write_pgm_file(self, image_number, filename):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::writePgmFile")
|
|
self.c_obj.writePgmFile(image_number, filename.encode())
|
|
|
|
def is_image_disparity_pair(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::isImageDisparityPair")
|
|
return self.c_obj.isImageDisparityPair()
|
|
|
|
def get_bytes_per_pixel(self, what):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getBytesPerPixel")
|
|
image_number = self.get_index_of(what, True) if isinstance(what, ImageType) else int(what)
|
|
return self.c_obj.getBytesPerPixel(<int> image_number)
|
|
|
|
def get_number_of_images(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getNumberOfImages")
|
|
return self.c_obj.getNumberOfImages()
|
|
|
|
def get_index_of(self, what, throw_if_not_found=False):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getIndexOf")
|
|
return self.c_obj.getIndexOf(what, throw_if_not_found)
|
|
|
|
def has_image_type(self, what):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::hasImageType")
|
|
return self.c_obj.hasImageType(what)
|
|
|
|
def get_exposure_time(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getExposureTime")
|
|
return self.c_obj.getExposureTime()
|
|
|
|
def get_last_sync_pulse(self):
|
|
'''
|
|
Gets the timestamp of the last received sync pulse.
|
|
|
|
Returns:
|
|
sec, usec: A tuple representing the time stamp: the integer seconds, and the
|
|
fractional seconds part in microseconds.
|
|
'''
|
|
cdef int sec = 0
|
|
cdef int usec = 0
|
|
self.c_obj.getLastSyncPulse(sec, usec)
|
|
return sec, usec
|
|
|
|
def set_width(self, width):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setWidth")
|
|
self.c_obj.setWidth(width)
|
|
|
|
def set_height(self, height):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setHeight")
|
|
self.c_obj.setHeight(height)
|
|
|
|
def set_row_stride(self, image_number, row_stride):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setRowStride")
|
|
self.c_obj.setRowStride(image_number, row_stride)
|
|
|
|
def set_pixel_format(self, image_number, image_format):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setPixelFormat")
|
|
self.c_obj.setPixelFormat(image_number, image_format)
|
|
|
|
def set_pixel_data(self, image_number, np.ndarray[np.uint8_t, ndim=1, mode="c"] pixel_data):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setPixelData")
|
|
cdef unsigned char* oldptr = self.c_obj.getPixelData(image_number)
|
|
if oldptr != NULL and not self._touched_internally:
|
|
# This is the only kind of data access we actively prevent here.
|
|
# The C++ API (ImageTransfer) would have no way of freeing its own
|
|
# buffers, and would try to free the numpy array data instead!
|
|
# The double check is done because it is OK to replace one numpy
|
|
# array with a different one (not really sensible, but valid).
|
|
raise RuntimeError('Refused to set pixel data: pixel data is managed by the C++ API. Please use copy() or start from an empty ImageSet.')
|
|
self.c_obj.setPixelData(image_number, &pixel_data[0]) # raw pointer is stored (will throw here on invalid index)
|
|
self._numpy_pixels[image_number] = pixel_data # store locally for refcount
|
|
self._touched_internally = True # object is whitelisted for overwriting data
|
|
|
|
def set_qmatrix(self, np.ndarray[float, ndim=2, mode="c"] q):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setQMatrix")
|
|
self.c_obj.setQMatrix(&q[0, 0]) # a raw pointer is passed and stored
|
|
self._numpy_q = q # but a reference is stored here to hold a refcount
|
|
|
|
def set_sequence_number(self, num):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setSequenceNumber")
|
|
self.c_obj.setSequenceNumber(num)
|
|
|
|
def set_timestamp(self, sec, usec):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setTimestamp")
|
|
self.c_obj.setTimestamp(sec, usec)
|
|
|
|
def set_disparity_range(self, minimum, maximum):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setDisparityRange")
|
|
self.c_obj.setDisparityRange(minimum, maximum)
|
|
|
|
def set_subpixel_factor(self, subpixel_factor):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setSubpixelFactor")
|
|
self.c_obj.setSubpixelFactor(subpixel_factor)
|
|
|
|
def set_number_of_images(self, number):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setNumberOfImages")
|
|
cdef unsigned char* oldptr = self.c_obj.getPixelData(0)
|
|
# Changing the number of images with data present could mess up
|
|
# memory management (e.g. by allowing to add numpy data to new
|
|
# channels of C++-managed objects, or preventing necessary frees).
|
|
# Therefore, we allow setting this number only in ImageSets that
|
|
# have only been filled from the Python side.
|
|
if oldptr != NULL and not self._touched_internally:
|
|
raise RuntimeError('Refused to change number of images: pixel data is managed by the C++ API. Please use copy() or start from an empty ImageSet.')
|
|
self.c_obj.setNumberOfImages(number)
|
|
|
|
def set_index_of(self, what, idx):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setIndexOf")
|
|
self.c_obj.setIndexOf(what, idx)
|
|
|
|
def set_exposure_time(self, time_microsec):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setExposureTime")
|
|
self.c_obj.setExposureTime(time_microsec)
|
|
|
|
def set_last_sync_pulse(self, sec, usec):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setLastSyncPulse")
|
|
return self.c_obj.setLastSyncPulse(sec, usec)
|
|
|
|
|
|
cdef class ImageTransfer:
|
|
'''
|
|
Class for synchronous transfer of image sets.
|
|
|
|
This class opens a network socket for delivering or receiving image sets. All
|
|
operations are performed synchronously, which means that they might block.
|
|
The class encapsulates ImageProtocol.
|
|
|
|
This class is thread safe for as long as sending and receiving data
|
|
each has its dedicated thread.
|
|
|
|
Note for Python version: for best performance, the use of AsyncTransfer
|
|
is recommended for all regular desktop systems.
|
|
'''
|
|
cdef cpp.ImageTransfer* c_obj
|
|
|
|
def __cinit__(self, DeviceInfo device, int buffer_size=1048576, int max_udp_packet_size=1472):
|
|
self.c_obj = new cpp.ImageTransfer(device.c_obj, buffer_size, max_udp_packet_size)
|
|
|
|
def __dealloc__(self):
|
|
del self.c_obj
|
|
|
|
def is_connected(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageTransfer::isConnected")
|
|
return self.c_obj.isConnected()
|
|
|
|
def disconnect(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageTransfer::disconnect")
|
|
self.c_obj.disconnect()
|
|
|
|
def get_remote_address(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageTransfer::getRemoteAddress")
|
|
cdef string s = self.c_obj.getRemoteAddress()
|
|
return s.decode("utf-8")
|
|
|
|
def receive_image_pair(self):
|
|
'''DEPRECATED: Use receive_image_set() instead.'''
|
|
return self.receive_image_set()
|
|
|
|
def receive_image_set(self):
|
|
'''
|
|
Waits for and receives a new image set.
|
|
|
|
Returns:
|
|
Returns an ImageSet a new image set has been received. Otherwise
|
|
None.
|
|
|
|
The received image set is only valid until the next call of receive_image_set.
|
|
The method will not block indefinitely, but return after a short timeout.
|
|
|
|
You can use receive() as a Python library convenience wrapper
|
|
for more efficient repolling with custom delay and number of attempts.
|
|
'''
|
|
imp = ImageSet()
|
|
ret = self.c_obj.receiveImageSet(imp.c_obj)
|
|
return imp if ret else None
|
|
|
|
def receive(self, timeout=-1, poll_delay=0.001):
|
|
'''
|
|
Python: polling wrapper for receive_image_set.
|
|
|
|
Args:
|
|
timeout: The timeout in seconds before returning None unless an
|
|
image arrives. A non-positive timeout means to wait forever.
|
|
poll_delay: The sleep delay to enforce after each polling
|
|
attempt.
|
|
|
|
Returns:
|
|
An ImageSet if an image set has been received before the timeout.
|
|
None otherwise.
|
|
|
|
On desktop systems, use AsyncTransfer instead for best performance.
|
|
'''
|
|
imp = ImageSet()
|
|
t0 = time.time()
|
|
while timeout <= 0 or (time.time() - t0) < timeout:
|
|
ret = self.c_obj.receiveImageSet(imp.c_obj)
|
|
if ret: return imp
|
|
time.sleep(poll_delay)
|
|
return None
|
|
|
|
def get_num_dropped_frames(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageTransfer::getNumDroppedFrames")
|
|
return self.c_obj.getNumDroppedFrames()
|
|
|
|
cdef class AsyncTransfer:
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::AsyncTransfer")
|
|
cdef cpp.AsyncTransfer* c_obj
|
|
|
|
def __cinit__(self, DeviceInfo device, int buffer_size=1048576, int max_udp_packet_size=1472):
|
|
self.c_obj = new cpp.AsyncTransfer(device.c_obj, buffer_size, max_udp_packet_size)
|
|
|
|
def __dealloc__(self):
|
|
del self.c_obj
|
|
|
|
def is_connected(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::AsyncTransfer::isConnected")
|
|
return self.c_obj.isConnected()
|
|
|
|
def disconnect(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::AsyncTransfer::disconnect")
|
|
self.c_obj.disconnect()
|
|
|
|
def get_remote_address(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::AsyncTransfer::getRemoteAddress")
|
|
cdef string s = self.c_obj.getRemoteAddress()
|
|
return s.decode("utf-8")
|
|
|
|
def collect_received_image_pair(self, timeout=-1):
|
|
'''DEPRECATED: Use collect_received_image_set() instead.'''
|
|
return self.collect_received_image_set(timeout)
|
|
|
|
def collect_received_image_set(self, timeout=-1):
|
|
'''
|
|
Collects the asynchronously received image.
|
|
|
|
Args:
|
|
timeout: The maximum time in seconds for which to wait if no
|
|
image set has been received yet.
|
|
|
|
Returns:
|
|
An ImageSet if an image set has been received before the timeout.
|
|
|
|
If no image set has been received, this method might block or return None.
|
|
Otherwise the returned image set is valid until the next call.
|
|
|
|
If timeout is set to a value < 0, the function will block indefinitely.
|
|
If timeout = 0, the function will return immediately, and if timeout is > 0 then
|
|
the function will block for the given amount of time in seconds. The received
|
|
image set is only valid until the next call of this function.
|
|
'''
|
|
imp = ImageSet()
|
|
ret = self.c_obj.collectReceivedImageSet(imp.c_obj, timeout)
|
|
return imp if ret else None
|
|
|
|
def get_num_dropped_frames(self):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::AsyncTransfer::getNumDroppedFrames")
|
|
return self.c_obj.getNumDroppedFrames()
|
|
|
|
cdef class Reconstruct3D:
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::Reconstruct3D")
|
|
cdef cpp.Reconstruct3D c_obj
|
|
|
|
def create_point_map_from_disparity_data(self, disp_map_data, width, height, row_stride, q, min_disparity=1, subpixel_factor=16, max_z=0, max_disparity=0xFFF):
|
|
'''
|
|
Reconstructs the 3D location of each pixel in the given disparity map,
|
|
with custom parameters.
|
|
|
|
Args:
|
|
disp_map_data: Data of the disparity map (unsigned short array). The
|
|
disparity map is assumed to have a N-bit subpixel resolution.
|
|
This means that each value needs to be divided by the subpixel factor
|
|
to receive the true disparity.
|
|
width, height: Disparity map dimensions
|
|
row_stride: Row stride (i.e. distance between two rows in bytes)
|
|
of the disparity map.
|
|
q: Disparity-to-depth mapping matrix of size 4x4. The matrix is
|
|
stored in a row-wise alignment. Obtain this matrix from your
|
|
camera calibration data.
|
|
minDisparity: The minimum disparity, again with N-bit subpixel
|
|
resolution. Lower disparities will be clamped to this value
|
|
before computing the 3D location (default 1).
|
|
subpixel_factor: Subpixel division factor for disparity value
|
|
(default 16)
|
|
max_z: (Python specific) Filter the numpy array to only return
|
|
points closer than specified value. A non-positive value means
|
|
no filtering (default).
|
|
max_disparity: The maximum value that occurs in the disparity map. Any value
|
|
greater or equal will be marked as invalid.
|
|
Returns:
|
|
A numpy array of size [:,3] containing the 3D points corresponding to the disparity map.
|
|
|
|
Please refer to the C++ API docs for further details.
|
|
'''
|
|
cdef int size = width * height * 4
|
|
cdef unsigned short[:, ::1] disp_map_arr = disp_map_data
|
|
cdef float[:, ::1] q_arr = q.astype(np.float32)
|
|
cdef float* point_map_data = self.c_obj.createPointMap(&disp_map_arr[0, 0], width, height, row_stride, &q_arr[0, 0], min_disparity, subpixel_factor, max_disparity)
|
|
|
|
cdef view.array arr = view.array(shape=(size,), itemsize=sizeof(float), format="f", mode="c", allocate_buffer=False)
|
|
arr.data = <char*> point_map_data
|
|
|
|
np_array = np.asarray(arr)
|
|
np_array = np_array.reshape(height * width, 4)
|
|
np_array = np_array[:, :3]
|
|
|
|
if max_z > 0:
|
|
np_array = np_array[np_array[:, 2] < max_z]
|
|
|
|
return np_array
|
|
|
|
def create_point_map(self, ImageSet image_set, min_disparity=1, max_z=0):
|
|
'''
|
|
Reconstructs the 3D location of each pixel using the disparity map
|
|
and metadata of the given image set.
|
|
|
|
Args:
|
|
image_set: Image set containing the disparity map.
|
|
min_disparity: The minimum disparity with 4-bit subpixel resolution.
|
|
max_z: (Python specific) Filter the numpy array to only return
|
|
points closer than specified value. A non-positive value means
|
|
no filtering (default).
|
|
|
|
Returns:
|
|
A numpy array of size [:,3] containing the 3D points corresponding to the disparity map.
|
|
|
|
Please refer to the C++ API docs for further details.
|
|
'''
|
|
cdef int w = image_set.c_obj.getWidth()
|
|
cdef int h = image_set.c_obj.getHeight()
|
|
cdef int size = w * h * 4
|
|
cdef float* point_map_data = self.c_obj.createPointMap(image_set.c_obj, min_disparity)
|
|
|
|
cdef view.array arr = view.array(shape=(size,), itemsize=sizeof(float), format="f", mode="c", allocate_buffer=False)
|
|
arr.data = <char*> point_map_data
|
|
|
|
np_array = np.asarray(arr)
|
|
np_array = np_array.reshape(h * w, 4)
|
|
np_array = np_array[:, :3]
|
|
|
|
if max_z > 0:
|
|
np_array = np_array[np_array[:, 2] < max_z]
|
|
|
|
return np_array
|
|
|
|
def create_point_map_and_color_map(self, ImageSet image_set, min_disparity=1, max_z=0):
|
|
'''
|
|
Reconstructs the 3D location of each pixel using the disparity map
|
|
and metadata of the given image set, alongside their colors.
|
|
|
|
Args:
|
|
image_set: Image set containing the disparity map.
|
|
min_disparity: The minimum disparity with 4-bit subpixel resolution.
|
|
max_z: (Python specific) Filter the numpy array to only return
|
|
points closer than specified value. A non-positive value means
|
|
no filtering (default).
|
|
|
|
Returns:
|
|
Two numpy arrays of identical size [:,3], the first containing the 3D points corresponding
|
|
to the disparity map, and the second one their colors as float RGB triplets (or None if
|
|
the ImageSet is disparity-only).
|
|
'''
|
|
cdef int w = image_set.c_obj.getWidth()
|
|
cdef int h = image_set.c_obj.getHeight()
|
|
cdef int size = w * h * 4
|
|
cdef float* point_map_data = self.c_obj.createPointMap(image_set.c_obj, min_disparity)
|
|
|
|
cdef view.array arr = view.array(shape=(size,), itemsize=sizeof(float), format="f", mode="c", allocate_buffer=False)
|
|
arr.data = <char*> point_map_data
|
|
|
|
coords = np.asarray(arr)
|
|
coords = coords.reshape(h * w, 4)
|
|
coords = coords[:, :3]
|
|
|
|
pix = None
|
|
if image_set.has_image_type(ImageType.IMAGE_LEFT):
|
|
pix = image_set.get_pixel_data(ImageType.IMAGE_LEFT, force8bit=True, do_copy=False)
|
|
if len(pix.shape)==2: pix = np.stack([pix]*3, 2) # Expand grayscale to rgb triplets
|
|
pix = pix.reshape((-1, 3)).astype(np.float64) / 255.0
|
|
|
|
if max_z > 0:
|
|
if pix is not None:
|
|
pix = pix[coords[:, 2] < max_z]
|
|
coords = coords[coords[:, 2] < max_z]
|
|
|
|
return coords, pix
|
|
|
|
def create_open3d_pointcloud(self, ImageSet image_set, min_disparity=1, max_z=0):
|
|
'''
|
|
Convenience wrapper to directly return an Open3D point cloud for an image set.
|
|
|
|
Args:
|
|
image_set: Image set containing the disparity map.
|
|
min_disparity: The minimum disparity with 4-bit subpixel resolution.
|
|
max_z: (Python specific) Filter the point cloud data to only return
|
|
points closer than specified value. A non-positive value means
|
|
no filtering (default).
|
|
|
|
Returns:
|
|
An open3d.geometry.PointCloud for the (filtered) coordinates from the ImageSet.
|
|
Contains color information unless the ImageSet was disparity-only.
|
|
'''
|
|
import open3d
|
|
pointmap, colors = self.create_point_map_and_color_map(image_set, min_disparity=min_disparity, max_z=max_z)
|
|
pcd = open3d.geometry.PointCloud(open3d.utility.Vector3dVector(pointmap))
|
|
if colors is not None:
|
|
pcd.colors = open3d.utility.Vector3dVector(colors)
|
|
return pcd
|
|
|
|
def create_open3d_rgbd_image(self, ImageSet image_set, min_disparity=1, depth_trunc=3.0, depth_scale=1.0):
|
|
'''
|
|
Convenience wrapper to directly return an Open3D RGBD image for an ImageSet.
|
|
Raises a RuntimeError when called with a disparity-only image set.
|
|
|
|
Args:
|
|
image_set: Image set containing the disparity map.
|
|
min_disparity: The minimum disparity with 4-bit subpixel resolution.
|
|
depth_trunc: (Open3D argument, relayed) Filter the depth channel to
|
|
zero-clamp points more distant than the specified value (default 3.0).
|
|
|
|
Returns:
|
|
An open3d.geometry.RGBDImage for the image set.
|
|
'''
|
|
import open3d
|
|
|
|
if not image_set.has_image_type(ImageType.IMAGE_LEFT):
|
|
raise RuntimeError('Cannot create an RGBD image - no left image data in ImageSet')
|
|
|
|
cdef int w = image_set.c_obj.getWidth()
|
|
cdef int h = image_set.c_obj.getHeight()
|
|
cdef float* z_data = self.c_obj.createZMap(image_set.c_obj, minDisparity=min_disparity, maxDisparity=0xFFF)
|
|
cdef view.array arr = view.array(shape=(h, w,), itemsize=sizeof(float), format="f", mode="c", allocate_buffer=False)
|
|
arr.data = <char*> z_data
|
|
depth = np.asarray(arr).astype(np.float32)
|
|
|
|
color = image_set.get_pixel_data(ImageType.IMAGE_LEFT, force8bit=True)
|
|
|
|
img = open3d.geometry.RGBDImage.create_from_color_and_depth(
|
|
open3d.cpu.pybind.geometry.Image(color),
|
|
open3d.cpu.pybind.geometry.Image(depth),
|
|
depth_scale=depth_scale, depth_trunc=depth_trunc)
|
|
return img
|
|
|
|
def project_single_point(self, point_x, point_y, disparity, q, subpix_factor):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::Reconstruct3D::projectSinglePoint", "PYTHON NOTE: Returns a tuple (pointX, pointY, pointZ). Please ignore those C++ reference arguments.")
|
|
cdef float proj_x = 0
|
|
cdef float proj_y = 0
|
|
cdef float proj_z = 0
|
|
cdef float[:, ::1] q_arr = q.astype(np.float32)
|
|
self.c_obj.projectSinglePoint(point_x, point_y, disparity, &q_arr[0, 0], proj_x, proj_y, proj_z, subpix_factor)
|
|
return proj_x, proj_y, proj_z
|
|
|
|
|
|
def write_ply_file(self, filename, ImageSet image_set, double max_z=sys.float_info.max, bool binary=False):
|
|
_SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::Reconstruct3D::writePlyFile")
|
|
self.c_obj.writePlyFile(filename.encode(), image_set.c_obj, max_z, binary)
|
|
|
|
|