# distutils: language=c++ # cython: embedsignature=True, language_level=3 ###############################################################################/ # Copyright (c) 2021 Nerian Vision GmbH # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. ###############################################################################/ ''' Python 3 wrapper for libvisiontransfer by Nerian Vision This module is a wrapper for the libvisiontransfer library, used to control and acquire data from Nerian's line of stereo vision devices. This documentation is largely autogenerated from the C++ library doxygen annotations: Please note that in some instances, the actual functions have been adapted to be more Pythonic from their C++-specific calling conventions. In particular, the auto-generated documentation of parameter getter functions may indicate a number of arguments (C++ reference arguments), but they actually directly return tuples in this Python library. Refer to their Cython signature line (first line of their docstring) to see the true arguments you can use; the rest of the arguments in the C++ argument list is instead returned as a result tuple. ============================================================================= Copyright (c) 2021 Nerian Vision GmbH Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. ''' cimport visiontransfer_cpp as cpp # Autogenerated parameter access in extra file include "visiontransfer/visiontransfer_parameters_autogen.pyx" from libcpp.string cimport string from libcpp.vector cimport vector from libcpp cimport bool from cython cimport view cimport numpy as np import numpy as np np.import_array() import enum import sys import time class AutoMode(enum.IntEnum): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceParameters::AutoMode") AUTO_EXPOSURE_AND_GAIN = 0 AUTO_EXPOSURE_MANUAL_GAIN = 1 MANUAL_EXPOSURE_AUTO_GAIN = 2 MANUAL_EXPOSURE_MANUAL_GAIN = 3 class DeviceModel(enum.IntEnum): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceParameters::DeviceModel") SCENESCAN = 0 SCENESCAN_PRO = 1 class NetworkProtocol(enum.IntEnum): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceInfo::NetworkProtocol") PROTOCOL_TCP = 0 PROTOCOL_UDP = 1 class ProtocolType(enum.IntEnum): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageProtocol::ProtocolType") PROTOCOL_TCP = 0 PROTOCOL_UDP = 1 class ImageFormat(enum.IntEnum): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::ImageFormat") FORMAT_8_BIT_MONO = 0 FORMAT_8_BIT_RGB = 1 FORMAT_12_BIT_MONO = 2 class ImageType(enum.IntEnum): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::ImageType") IMAGE_UNDEFINED = 0 IMAGE_LEFT = 1 IMAGE_DISPARITY = 2 IMAGE_RIGHT = 3 class OperationMode(enum.IntEnum): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceParameters::OperationMode") PASS_THROUGH = 0 RECTIFY = 1 STEREO_MATCHING = 2 class TargetFrame(enum.IntEnum): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceParameters::TargetFrame") LEFT_FRAME = 0 RIGHT_FRAME = 1 BOTH_FRAMES = 2 cdef class DeviceEnumeration: _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceEnumeration") cdef cpp.DeviceEnumeration c_obj def discover_devices(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceEnumeration::discoverDevices") device_infos = [] cdef vector[cpp.DeviceInfo] devices = self.c_obj.discoverDevices() for device in devices: di = DeviceInfo() di.c_obj = device device_infos.append(di) return device_infos cdef class DeviceInfo: _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceInfo") cdef cpp.DeviceInfo c_obj def get_ip_address(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceInfo::getIpAddress") cdef string s = self.c_obj.getIpAddress() return s.decode("utf-8") def get_network_protocol(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceInfo::getNetworkProtocol") return NetworkProtocol(self.c_obj.getNetworkProtocol()) def get_firmware_version(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceInfo::getFirmwareVersion") cdef string s = self.c_obj.getFirmwareVersion() return s.decode("utf-8") def get_model(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceInfo::getModel") return DeviceModel(self.c_obj.getModel()) def get_status(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceInfo::getStatus") ds = DeviceStatus() ds.c_obj = self.c_obj.getStatus() return ds def is_compatible(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceInfo::isCompatible") return self.c_obj.isCompatible() def __str__(self): cdef string s = self.c_obj.toString() return s.decode("utf-8") __repr__ = __str__ cdef class DeviceStatus: _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceStatus") cdef cpp.DeviceStatus c_obj def is_valid(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceStatus::isValid") return self.c_obj.isValid() def get_last_fps(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceStatus::getLastFps") return self.c_obj.getLastFps() def get_jumbo_mtu(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceStatus::getJumboMtu") return self.c_obj.getJumboMtu() def get_jumbo_frames_enabled(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceStatus::getJumboFramesEnabled") # return as bool here (still uint in API) return self.c_obj.getJumboFramesEnabled() != 0 def get_current_capture_source(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::DeviceStatus::getCurrentCaptureSource") cdef string s = self.c_obj.getCurrentCaptureSource() return s.decode("utf-8") def create_image_set_from_reduced_data(width, height, nimg, indices, strides, formats, data, qmat, seqnum, subpix, expos, disprange, times, lastsync): '''Only for internal use (shim for unpickling / copy).''' imset = ImageSet() imset.set_width(width) imset.set_height(height) imset.set_number_of_images(nimg) for i, what in enumerate([ImageType.IMAGE_LEFT, ImageType.IMAGE_DISPARITY, ImageType.IMAGE_RIGHT]): imset.set_index_of(what, indices[i]) for i in range(nimg): imset.set_row_stride(i, strides[i]) imset.set_pixel_format(i, formats[i]) imset.set_pixel_data(i, data[i]) imset.set_qmatrix(qmat) imset.set_sequence_number(seqnum) imset.set_subpixel_factor(subpix) imset.set_exposure_time(expos) a, b = disprange imset.set_disparity_range(a, b) a, b = times imset.set_timestamp(a, b) a, b = lastsync imset.set_last_sync_pulse(a, b) return imset cdef class ImageSet: _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet") cdef cpp.ImageSet c_obj cdef np.ndarray _numpy_q cdef list _numpy_pixels cdef bool _touched_internally def __cinit__(self): # These members are just here to keep alive the refcounted # data references for ImageSets created on the Python side # (e.g. unpickled data) -- bear in mind the C++ API purely # operates on unmanaged raw data pointers into numpy arrays. self._numpy_q = None self._numpy_pixels = [None]*3 # MAX_SUPPORTED_IMAGES # Successfully setting pixel data from Python flags this # object, whitelisting later overwriting (which is prevented # a priori for all C++-/ImageTransfer-managed objects). self._touched_internally = False def __reduce__(self): nimg = self.get_number_of_images() return (create_image_set_from_reduced_data, ( self.get_width(), self.get_height(), nimg, [self.get_index_of(i) for i in [ImageType.IMAGE_LEFT, ImageType.IMAGE_DISPARITY, ImageType.IMAGE_RIGHT]], [self.get_row_stride(i) for i in range(nimg)], [self.get_pixel_format(i) for i in range(nimg)], [self.get_pixel_data_raw(i) for i in range(nimg)], self.get_qmatrix(), self.get_sequence_number(), self.get_subpixel_factor(), self.get_exposure_time(), self.get_disparity_range(), self.get_timestamp(), self.get_last_sync_pulse(), )) def __str__(self): w = self.get_width() h = self.get_height() return f"ImageSet({w}, {h})" __repr__ = __str__ def copy(self): ''' Create a full copy of the ImageSet. All its data is managed by Python (i.e. no deallocation attempts by the C++ API will ever take place on this clone). ''' cloned = ImageSet() nimg = self.get_number_of_images() cloned.set_height(self.get_height()) cloned.set_width(self.get_width()) cloned.set_number_of_images(nimg) for i in [ImageType.IMAGE_LEFT, ImageType.IMAGE_DISPARITY, ImageType.IMAGE_RIGHT]: cloned.set_index_of(i, self.get_index_of(i)) for i in range(nimg): cloned.set_row_stride(i, self.get_row_stride(i)) cloned.set_pixel_format(i, self.get_pixel_format(i)) sz = cloned.get_height() * cloned.get_row_stride(i) cloned.set_pixel_data(i, self.get_pixel_data_raw(i).copy()) # this also sets _touched_internally -> data is replaceable cloned.set_qmatrix(self.get_qmatrix()) cloned.set_sequence_number(self.get_sequence_number()) cloned.set_subpixel_factor(self.get_subpixel_factor()) cloned.set_exposure_time(self.get_exposure_time()) a, b = self.get_disparity_range() cloned.set_disparity_range(a, b) a, b = self.get_timestamp() cloned.set_timestamp(a, b) a, b = self.get_last_sync_pulse() cloned.set_last_sync_pulse(a, b) return cloned def get_width(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getWidth") return self.c_obj.getWidth() def get_height(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getHeight") return self.c_obj.getHeight() def get_row_stride(self, image_number): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getRowStride") return self.c_obj.getRowStride(image_number) def get_pixel_format(self, what): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getPixelFormat") image_number = self.get_index_of(what, True) if isinstance(what, ImageType) else int(what) return ImageFormat(self.c_obj.getPixelFormat( image_number)) def get_pixel_data_raw(self, what): '''Return a flat uint8 view of the image data of the specified channel (primarily for internal use).''' image_number = self.get_index_of(what, True) if isinstance(what, ImageType) else int(what) cdef int rowstride = self.c_obj.getRowStride(image_number) cdef int h = self.c_obj.getHeight() cdef int size = rowstride * h np_array = self._pixel_data_as_char_array(image_number, size) return np_array def get_pixel_data(self, what, force8bit=False, do_copy=True): ''' Obtain a numpy array containing the image data for a channel. Args: what: The ImageType or image index to retrieve. force8bit: optional flag, causes rescaling to 0..255 in case of 12-bit images (dividing by 16). do_copy: copy the final array view (default True; primarily for internal use, disable with caution) Returns: The image data as a copied numpy array; two-dimensional for monochrome images, three-dimensional for RGB. ''' image_number = self.get_index_of(what, True) if isinstance(what, ImageType) else int(what) cdef int rowstride = self.c_obj.getRowStride(image_number) cdef int w = self.c_obj.getWidth() cdef int h = self.c_obj.getHeight() cdef int size fmt = self.get_pixel_format(image_number) if fmt == ImageFormat.FORMAT_12_BIT_MONO: size = (rowstride * h) // 2 np_array = self._pixel_data_as_short_array(image_number, size) np_array = np_array.reshape(h, rowstride//2) np_array = np_array[:, :w] if force8bit: return (np_array // 16).astype(np.uint8) # implicit copy else: return np_array.copy() if do_copy else np_array elif fmt == ImageFormat.FORMAT_8_BIT_RGB: size = rowstride * h np_array = self._pixel_data_as_char_array(image_number, size) np_array = np_array.reshape(h, rowstride//3, 3) np_array = np_array[:, :w, :] return np_array.copy() if do_copy else np_array elif fmt == ImageFormat.FORMAT_8_BIT_MONO: size = rowstride * h np_array = self._pixel_data_as_char_array(image_number, size) np_array = np_array.reshape(h, rowstride) np_array = np_array[:, :w] return np_array.copy() if do_copy else np_array cdef _pixel_data_as_short_array(self, int image_number, int size): cdef unsigned char* pointer = self.c_obj.getPixelData(image_number) cdef np.uint16_t* short_prt = pointer cdef np.uint16_t[:] myview = short_prt return np.asarray(myview) cdef _pixel_data_as_char_array(self, int image_number, int size): cdef unsigned char* pointer = self.c_obj.getPixelData(image_number) cdef np.uint8_t[:] char_view = pointer return np.asarray(char_view) def get_qmatrix(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getQMatrix") cdef view.array ar = view.array(shape=(16, ), itemsize=sizeof(float), format="f", mode="c", allocate_buffer=False) cdef const float* pointer = self.c_obj.getQMatrix() ar.data = pointer np_array = np.asarray(ar) np_array = np_array.reshape(4, 4) return np_array def get_sequence_number(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getSequenceNumber") return self.c_obj.getSequenceNumber() def get_timestamp(self): ''' Returns the time at which this image set has been captured. Returns: sec, usec: A tuple representing the time stamp: the integer seconds, and the fractional seconds part in microseconds. ''' cdef int sec = 0 cdef int usec = 0 self.c_obj.getTimestamp(sec, usec) return sec, usec def get_disparity_range(self): ''' Gets the value range for the disparity map contained in this image set. If the image set does not contain any disparity data then the disparity range is undefined. Returns: minimum, maximum: The minimum and maximum disparity in the image set. ''' cdef int minimum = 0 cdef int maximum = 0 self.c_obj.getDisparityRange(minimum, maximum) return minimum, maximum def get_subpixel_factor(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getSubpixelFactor") return self.c_obj.getSubpixelFactor() def write_pgm_file(self, image_number, filename): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::writePgmFile") self.c_obj.writePgmFile(image_number, filename.encode()) def is_image_disparity_pair(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::isImageDisparityPair") return self.c_obj.isImageDisparityPair() def get_bytes_per_pixel(self, what): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getBytesPerPixel") image_number = self.get_index_of(what, True) if isinstance(what, ImageType) else int(what) return self.c_obj.getBytesPerPixel( image_number) def get_number_of_images(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getNumberOfImages") return self.c_obj.getNumberOfImages() def get_index_of(self, what, throw_if_not_found=False): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getIndexOf") return self.c_obj.getIndexOf(what, throw_if_not_found) def has_image_type(self, what): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::hasImageType") return self.c_obj.hasImageType(what) def get_exposure_time(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::getExposureTime") return self.c_obj.getExposureTime() def get_last_sync_pulse(self): ''' Gets the timestamp of the last received sync pulse. Returns: sec, usec: A tuple representing the time stamp: the integer seconds, and the fractional seconds part in microseconds. ''' cdef int sec = 0 cdef int usec = 0 self.c_obj.getLastSyncPulse(sec, usec) return sec, usec def set_width(self, width): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setWidth") self.c_obj.setWidth(width) def set_height(self, height): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setHeight") self.c_obj.setHeight(height) def set_row_stride(self, image_number, row_stride): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setRowStride") self.c_obj.setRowStride(image_number, row_stride) def set_pixel_format(self, image_number, image_format): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setPixelFormat") self.c_obj.setPixelFormat(image_number, image_format) def set_pixel_data(self, image_number, np.ndarray[np.uint8_t, ndim=1, mode="c"] pixel_data): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setPixelData") cdef unsigned char* oldptr = self.c_obj.getPixelData(image_number) if oldptr != NULL and not self._touched_internally: # This is the only kind of data access we actively prevent here. # The C++ API (ImageTransfer) would have no way of freeing its own # buffers, and would try to free the numpy array data instead! # The double check is done because it is OK to replace one numpy # array with a different one (not really sensible, but valid). raise RuntimeError('Refused to set pixel data: pixel data is managed by the C++ API. Please use copy() or start from an empty ImageSet.') self.c_obj.setPixelData(image_number, &pixel_data[0]) # raw pointer is stored (will throw here on invalid index) self._numpy_pixels[image_number] = pixel_data # store locally for refcount self._touched_internally = True # object is whitelisted for overwriting data def set_qmatrix(self, np.ndarray[float, ndim=2, mode="c"] q): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setQMatrix") self.c_obj.setQMatrix(&q[0, 0]) # a raw pointer is passed and stored self._numpy_q = q # but a reference is stored here to hold a refcount def set_sequence_number(self, num): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setSequenceNumber") self.c_obj.setSequenceNumber(num) def set_timestamp(self, sec, usec): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setTimestamp") self.c_obj.setTimestamp(sec, usec) def set_disparity_range(self, minimum, maximum): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setDisparityRange") self.c_obj.setDisparityRange(minimum, maximum) def set_subpixel_factor(self, subpixel_factor): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setSubpixelFactor") self.c_obj.setSubpixelFactor(subpixel_factor) def set_number_of_images(self, number): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setNumberOfImages") cdef unsigned char* oldptr = self.c_obj.getPixelData(0) # Changing the number of images with data present could mess up # memory management (e.g. by allowing to add numpy data to new # channels of C++-managed objects, or preventing necessary frees). # Therefore, we allow setting this number only in ImageSets that # have only been filled from the Python side. if oldptr != NULL and not self._touched_internally: raise RuntimeError('Refused to change number of images: pixel data is managed by the C++ API. Please use copy() or start from an empty ImageSet.') self.c_obj.setNumberOfImages(number) def set_index_of(self, what, idx): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setIndexOf") self.c_obj.setIndexOf(what, idx) def set_exposure_time(self, time_microsec): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setExposureTime") self.c_obj.setExposureTime(time_microsec) def set_last_sync_pulse(self, sec, usec): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageSet::setLastSyncPulse") return self.c_obj.setLastSyncPulse(sec, usec) cdef class ImageTransfer: ''' Class for synchronous transfer of image sets. This class opens a network socket for delivering or receiving image sets. All operations are performed synchronously, which means that they might block. The class encapsulates ImageProtocol. This class is thread safe for as long as sending and receiving data each has its dedicated thread. Note for Python version: for best performance, the use of AsyncTransfer is recommended for all regular desktop systems. ''' cdef cpp.ImageTransfer* c_obj def __cinit__(self, DeviceInfo device, int buffer_size=1048576, int max_udp_packet_size=1472): self.c_obj = new cpp.ImageTransfer(device.c_obj, buffer_size, max_udp_packet_size) def __dealloc__(self): del self.c_obj def is_connected(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageTransfer::isConnected") return self.c_obj.isConnected() def disconnect(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageTransfer::disconnect") self.c_obj.disconnect() def get_remote_address(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageTransfer::getRemoteAddress") cdef string s = self.c_obj.getRemoteAddress() return s.decode("utf-8") def receive_image_pair(self): '''DEPRECATED: Use receive_image_set() instead.''' return self.receive_image_set() def receive_image_set(self): ''' Waits for and receives a new image set. Returns: Returns an ImageSet a new image set has been received. Otherwise None. The received image set is only valid until the next call of receive_image_set. The method will not block indefinitely, but return after a short timeout. You can use receive() as a Python library convenience wrapper for more efficient repolling with custom delay and number of attempts. ''' imp = ImageSet() ret = self.c_obj.receiveImageSet(imp.c_obj) return imp if ret else None def receive(self, timeout=-1, poll_delay=0.001): ''' Python: polling wrapper for receive_image_set. Args: timeout: The timeout in seconds before returning None unless an image arrives. A non-positive timeout means to wait forever. poll_delay: The sleep delay to enforce after each polling attempt. Returns: An ImageSet if an image set has been received before the timeout. None otherwise. On desktop systems, use AsyncTransfer instead for best performance. ''' imp = ImageSet() t0 = time.time() while timeout <= 0 or (time.time() - t0) < timeout: ret = self.c_obj.receiveImageSet(imp.c_obj) if ret: return imp time.sleep(poll_delay) return None def get_num_dropped_frames(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::ImageTransfer::getNumDroppedFrames") return self.c_obj.getNumDroppedFrames() cdef class AsyncTransfer: _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::AsyncTransfer") cdef cpp.AsyncTransfer* c_obj def __cinit__(self, DeviceInfo device, int buffer_size=1048576, int max_udp_packet_size=1472): self.c_obj = new cpp.AsyncTransfer(device.c_obj, buffer_size, max_udp_packet_size) def __dealloc__(self): del self.c_obj def is_connected(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::AsyncTransfer::isConnected") return self.c_obj.isConnected() def disconnect(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::AsyncTransfer::disconnect") self.c_obj.disconnect() def get_remote_address(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::AsyncTransfer::getRemoteAddress") cdef string s = self.c_obj.getRemoteAddress() return s.decode("utf-8") def collect_received_image_pair(self, timeout=-1): '''DEPRECATED: Use collect_received_image_set() instead.''' return self.collect_received_image_set(timeout) def collect_received_image_set(self, timeout=-1): ''' Collects the asynchronously received image. Args: timeout: The maximum time in seconds for which to wait if no image set has been received yet. Returns: An ImageSet if an image set has been received before the timeout. If no image set has been received, this method might block or return None. Otherwise the returned image set is valid until the next call. If timeout is set to a value < 0, the function will block indefinitely. If timeout = 0, the function will return immediately, and if timeout is > 0 then the function will block for the given amount of time in seconds. The received image set is only valid until the next call of this function. ''' imp = ImageSet() ret = self.c_obj.collectReceivedImageSet(imp.c_obj, timeout) return imp if ret else None def get_num_dropped_frames(self): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::AsyncTransfer::getNumDroppedFrames") return self.c_obj.getNumDroppedFrames() cdef class Reconstruct3D: _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::Reconstruct3D") cdef cpp.Reconstruct3D c_obj def create_point_map_from_disparity_data(self, disp_map_data, width, height, row_stride, q, min_disparity=1, subpixel_factor=16, max_z=0, max_disparity=0xFFF): ''' Reconstructs the 3D location of each pixel in the given disparity map, with custom parameters. Args: disp_map_data: Data of the disparity map (unsigned short array). The disparity map is assumed to have a N-bit subpixel resolution. This means that each value needs to be divided by the subpixel factor to receive the true disparity. width, height: Disparity map dimensions row_stride: Row stride (i.e. distance between two rows in bytes) of the disparity map. q: Disparity-to-depth mapping matrix of size 4x4. The matrix is stored in a row-wise alignment. Obtain this matrix from your camera calibration data. minDisparity: The minimum disparity, again with N-bit subpixel resolution. Lower disparities will be clamped to this value before computing the 3D location (default 1). subpixel_factor: Subpixel division factor for disparity value (default 16) max_z: (Python specific) Filter the numpy array to only return points closer than specified value. A non-positive value means no filtering (default). max_disparity: The maximum value that occurs in the disparity map. Any value greater or equal will be marked as invalid. Returns: A numpy array of size [:,3] containing the 3D points corresponding to the disparity map. Please refer to the C++ API docs for further details. ''' cdef int size = width * height * 4 cdef unsigned short[:, ::1] disp_map_arr = disp_map_data cdef float[:, ::1] q_arr = q.astype(np.float32) cdef float* point_map_data = self.c_obj.createPointMap(&disp_map_arr[0, 0], width, height, row_stride, &q_arr[0, 0], min_disparity, subpixel_factor, max_disparity) cdef view.array arr = view.array(shape=(size,), itemsize=sizeof(float), format="f", mode="c", allocate_buffer=False) arr.data = point_map_data np_array = np.asarray(arr) np_array = np_array.reshape(height * width, 4) np_array = np_array[:, :3] if max_z > 0: np_array = np_array[np_array[:, 2] < max_z] return np_array def create_point_map(self, ImageSet image_set, min_disparity=1, max_z=0): ''' Reconstructs the 3D location of each pixel using the disparity map and metadata of the given image set. Args: image_set: Image set containing the disparity map. min_disparity: The minimum disparity with 4-bit subpixel resolution. max_z: (Python specific) Filter the numpy array to only return points closer than specified value. A non-positive value means no filtering (default). Returns: A numpy array of size [:,3] containing the 3D points corresponding to the disparity map. Please refer to the C++ API docs for further details. ''' cdef int w = image_set.c_obj.getWidth() cdef int h = image_set.c_obj.getHeight() cdef int size = w * h * 4 cdef float* point_map_data = self.c_obj.createPointMap(image_set.c_obj, min_disparity) cdef view.array arr = view.array(shape=(size,), itemsize=sizeof(float), format="f", mode="c", allocate_buffer=False) arr.data = point_map_data np_array = np.asarray(arr) np_array = np_array.reshape(h * w, 4) np_array = np_array[:, :3] if max_z > 0: np_array = np_array[np_array[:, 2] < max_z] return np_array def create_point_map_and_color_map(self, ImageSet image_set, min_disparity=1, max_z=0): ''' Reconstructs the 3D location of each pixel using the disparity map and metadata of the given image set, alongside their colors. Args: image_set: Image set containing the disparity map. min_disparity: The minimum disparity with 4-bit subpixel resolution. max_z: (Python specific) Filter the numpy array to only return points closer than specified value. A non-positive value means no filtering (default). Returns: Two numpy arrays of identical size [:,3], the first containing the 3D points corresponding to the disparity map, and the second one their colors as float RGB triplets (or None if the ImageSet is disparity-only). ''' cdef int w = image_set.c_obj.getWidth() cdef int h = image_set.c_obj.getHeight() cdef int size = w * h * 4 cdef float* point_map_data = self.c_obj.createPointMap(image_set.c_obj, min_disparity) cdef view.array arr = view.array(shape=(size,), itemsize=sizeof(float), format="f", mode="c", allocate_buffer=False) arr.data = point_map_data coords = np.asarray(arr) coords = coords.reshape(h * w, 4) coords = coords[:, :3] pix = None if image_set.has_image_type(ImageType.IMAGE_LEFT): pix = image_set.get_pixel_data(ImageType.IMAGE_LEFT, force8bit=True, do_copy=False) if len(pix.shape)==2: pix = np.stack([pix]*3, 2) # Expand grayscale to rgb triplets pix = pix.reshape((-1, 3)).astype(np.float64) / 255.0 if max_z > 0: if pix is not None: pix = pix[coords[:, 2] < max_z] coords = coords[coords[:, 2] < max_z] return coords, pix def create_open3d_pointcloud(self, ImageSet image_set, min_disparity=1, max_z=0): ''' Convenience wrapper to directly return an Open3D point cloud for an image set. Args: image_set: Image set containing the disparity map. min_disparity: The minimum disparity with 4-bit subpixel resolution. max_z: (Python specific) Filter the point cloud data to only return points closer than specified value. A non-positive value means no filtering (default). Returns: An open3d.geometry.PointCloud for the (filtered) coordinates from the ImageSet. Contains color information unless the ImageSet was disparity-only. ''' import open3d pointmap, colors = self.create_point_map_and_color_map(image_set, min_disparity=min_disparity, max_z=max_z) pcd = open3d.geometry.PointCloud(open3d.utility.Vector3dVector(pointmap)) if colors is not None: pcd.colors = open3d.utility.Vector3dVector(colors) return pcd def create_open3d_rgbd_image(self, ImageSet image_set, min_disparity=1, depth_trunc=3.0, depth_scale=1.0): ''' Convenience wrapper to directly return an Open3D RGBD image for an ImageSet. Raises a RuntimeError when called with a disparity-only image set. Args: image_set: Image set containing the disparity map. min_disparity: The minimum disparity with 4-bit subpixel resolution. depth_trunc: (Open3D argument, relayed) Filter the depth channel to zero-clamp points more distant than the specified value (default 3.0). Returns: An open3d.geometry.RGBDImage for the image set. ''' import open3d if not image_set.has_image_type(ImageType.IMAGE_LEFT): raise RuntimeError('Cannot create an RGBD image - no left image data in ImageSet') cdef int w = image_set.c_obj.getWidth() cdef int h = image_set.c_obj.getHeight() cdef float* z_data = self.c_obj.createZMap(image_set.c_obj, minDisparity=min_disparity, maxDisparity=0xFFF) cdef view.array arr = view.array(shape=(h, w,), itemsize=sizeof(float), format="f", mode="c", allocate_buffer=False) arr.data = z_data depth = np.asarray(arr).astype(np.float32) color = image_set.get_pixel_data(ImageType.IMAGE_LEFT, force8bit=True) img = open3d.geometry.RGBDImage.create_from_color_and_depth( open3d.cpu.pybind.geometry.Image(color), open3d.cpu.pybind.geometry.Image(depth), depth_scale=depth_scale, depth_trunc=depth_trunc) return img def project_single_point(self, point_x, point_y, disparity, q, subpix_factor): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::Reconstruct3D::projectSinglePoint", "PYTHON NOTE: Returns a tuple (pointX, pointY, pointZ). Please ignore those C++ reference arguments.") cdef float proj_x = 0 cdef float proj_y = 0 cdef float proj_z = 0 cdef float[:, ::1] q_arr = q.astype(np.float32) self.c_obj.projectSinglePoint(point_x, point_y, disparity, &q_arr[0, 0], proj_x, proj_y, proj_z, subpix_factor) return proj_x, proj_y, proj_z def write_ply_file(self, filename, ImageSet image_set, double max_z=sys.float_info.max, bool binary=False): _SUBSTITUTE_DOCSTRING_FOR_("visiontransfer::Reconstruct3D::writePlyFile") self.c_obj.writePlyFile(filename.encode(), image_set.c_obj, max_z, binary)