Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@ jobs:
uses: actions/setup-python@v1
with:
python-version: 3.7
- uses: dschep/[email protected]
- name: Install and build
run: |
python -m ensurepip --user
python -m pip install --upgrade pip --user
python -m pip install .[dev]
- name: Build with Poetry
- name: Build
run: python setup.py sdist bdist_wheel
- name: Publish distribution 📦 to Test PyPI
uses: pypa/gh-action-pypi-publish@master
Expand Down
3 changes: 1 addition & 2 deletions displayarray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,5 @@

__version__ = "1.1.1"

from .window.subscriber_windows import display, breakpoint_display, read_updates
from .frame.frame_publishing import publish_updates_zero_mq, publish_updates_ros
from .window.subscriber_windows import display, breakpoint_display, read_updates, publish_updates
from . import effects
18 changes: 12 additions & 6 deletions displayarray/effects/select_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@ def __init__(self, selected_channels: Iterable[int] = None):

def __call__(self, arr):
"""Run the channel selector."""
self.num_input_channels = arr.shape[-1]
out_arr = [
arr[..., min(max(0, x), arr.shape[-1] - 1)] for x in self.selected_channels
]
out_arr = np.stack(out_arr, axis=-1)
return out_arr
if isinstance(arr, list):
ars = []
for a in arr:
ars.append(self.__call__(a))
return ars
else:
self.num_input_channels = arr.shape[-1]
out_arr = [
arr[..., min(max(0, x), arr.shape[-1] - 1)] for x in self.selected_channels
]
out_arr = np.stack(out_arr, axis=-1)
return out_arr

def enable_mouse_control(self):
"""
Expand Down
148 changes: 16 additions & 132 deletions displayarray/frame/frame_publishing.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from displayarray.frame import subscriber_dictionary
from .np_to_opencv import NpCam
from .zmq_to_opencv import ZmqCam
from displayarray._uid import uid_for_source

from typing import Union, Tuple, Optional, Dict, Any, List, Callable
Expand All @@ -37,7 +38,7 @@
def pub_cam_loop_pyv4l2(
cam_id: Union[int, str, np.ndarray],
request_size: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
mjpg: bool = True,
fps_limit: float = float("inf"),
):
"""
Expand All @@ -46,7 +47,7 @@ def pub_cam_loop_pyv4l2(
You can send a quit command 'quit' to CVCams.<cam_id>.Cmd
Status information, such as failure to open, will be posted to CVCams.<cam_id>.Status

:param high_speed: Selects mjpeg transferring, which most cameras seem to support, so speed isn't limited
:param mjpg: Selects mjpeg transferring, which most cameras seem to support, so speed isn't limited
:param fps_limit: Limits the frames per second.
:param cam_id: An integer representing which webcam to use, or a string representing a video file.
:param request_size: A tuple with width, then height, to request the video size.
Expand Down Expand Up @@ -75,7 +76,7 @@ def pub_cam_loop_pyv4l2(
sub.return_on_no_data = ""
msg = ""

if high_speed and cam.pixel_format != "MJPEG":
if mjpg and cam.pixel_format != "MJPEG":
warnings.warn("Camera does not support high speed.")

now = time.time()
Expand Down Expand Up @@ -108,7 +109,7 @@ def pub_cam_loop_pyv4l2(
def pub_cam_loop_opencv(
cam_id: Union[int, str, np.ndarray],
request_size: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
mjpg: bool = True,
fps_limit: float = float("inf"),
extra: Optional[List[Tuple[int, int]]] = None,
) -> bool:
Expand All @@ -118,17 +119,21 @@ def pub_cam_loop_opencv(
You can send a quit command 'quit' to CVCams.<cam_id>.Cmd
Status information, such as failure to open, will be posted to CVCams.<cam_id>.Status

:param high_speed: Selects mjpeg transferring, which most cameras seem to support, so speed isn't limited
:param mjpg: Selects mjpeg transferring, which most cameras seem to support, so speed isn't limited
:param fps_limit: Limits the frames per second.
:param cam_id: An integer representing which webcam to use, or a string representing a video file.
:param request_size: A tuple with width, then height, to request the video size.
:return: True if loop ended normally, False if it failed somehow.
"""
name = uid_for_source(cam_id)

cam: Union[NpCam, ZmqCam, cv2.VideoCapture]
if isinstance(cam_id, (int, str)):
cam: Union[NpCam, cv2.VideoCapture] = cv2.VideoCapture(cam_id)
elif isinstance(cam_id, np.ndarray):
if isinstance(cam_id, str) and cam_id.startswith('tcp'):
cam = ZmqCam(cam_id)
else:
cam = cv2.VideoCapture(cam_id)
elif isinstance(cam_id, (np.ndarray)):
cam = NpCam(cam_id)
else:
raise TypeError(
Expand All @@ -143,7 +148,7 @@ def pub_cam_loop_opencv(
sub.return_on_no_data = ""
msg = ""

if high_speed:
if mjpg:
try:
cam.set(cv2.CAP_PROP_FOURCC, cv2.CAP_OPENCV_MJPEG)
except AttributeError:
Expand All @@ -160,7 +165,7 @@ def pub_cam_loop_opencv(
time.sleep(1.0 / (fps_limit - (time.time() - now)))
now = time.time()
(ret, frame) = cam.read() # type: Tuple[bool, np.ndarray ]
if ret is False or not isinstance(frame, np.ndarray):
if ret is False or not isinstance(frame, (np.ndarray, list)):
cam.release()
subscriber_dictionary.CV_CAMS_DICT[name].status_pub.publish("failed")
return False
Expand All @@ -183,7 +188,7 @@ def pub_cam_loop_opencv(
def pub_cam_thread(
cam_id: Union[int, str],
request_ize: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
mjpg: bool = True,
fps_limit: float = float("inf"),
force_backend="",
) -> threading.Thread:
Expand Down Expand Up @@ -211,129 +216,8 @@ def pub_cam_thread(
pub_cam_loop = pub_cam_loop_opencv

t = threading.Thread(
target=pub_cam_loop, args=(cam_id, request_ize, high_speed, fps_limit)
target=pub_cam_loop, args=(cam_id, request_ize, mjpg, fps_limit)
)
uid_dict[name] = t
t.start()
return t


async def publish_updates_zero_mq(
*vids,
callbacks: Optional[
Union[Dict[Any, FrameCallable], List[FrameCallable], FrameCallable]
] = None,
fps_limit=float("inf"),
size=(-1, -1),
end_callback: Callable[[], bool] = lambda: False,
blocking=False,
publishing_address="tcp://127.0.0.1:5600",
prepend_topic="",
flags=0,
copy=True,
track=False,
):
"""Publish frames to ZeroMQ when they're updated."""
import zmq
from displayarray import read_updates

ctx = zmq.Context()
s = ctx.socket(zmq.PUB)
s.bind(publishing_address)

if not blocking:
flags |= zmq.NOBLOCK

try:
for v in read_updates(vids, callbacks, fps_limit, size, end_callback, blocking):
if v:
for vid_name, frame in v.items():
md = dict(
dtype=str(frame.dtype),
shape=frame.shape,
name=prepend_topic + vid_name,
)
s.send_json(md, flags | zmq.SNDMORE)
s.send(frame, flags, copy=copy, track=track)
if fps_limit:
await asyncio.sleep(1.0 / fps_limit)
else:
await asyncio.sleep(0)
except KeyboardInterrupt:
pass
finally:
vid_names = [uid_for_source(name) for name in vids]
for v in vid_names:
subscriber_dictionary.stop_cam(v)


async def publish_updates_ros(
*vids,
callbacks: Optional[
Union[Dict[Any, FrameCallable], List[FrameCallable], FrameCallable]
] = None,
fps_limit=float("inf"),
size=(-1, -1),
end_callback: Callable[[], bool] = lambda: False,
blocking=False,
node_name="displayarray",
publisher_name="npy",
rate_hz=None,
dtype=None,
):
"""Publish frames to ROS when they're updated."""
import rospy
from rospy.numpy_msg import numpy_msg
import std_msgs.msg
from displayarray import read_updates

def get_msg_type(dtype):
if dtype is None:
msg_type = {
np.float32: std_msgs.msg.Float32(),
np.float64: std_msgs.msg.Float64(),
np.bool: std_msgs.msg.Bool(),
np.char: std_msgs.msg.Char(),
np.int16: std_msgs.msg.Int16(),
np.int32: std_msgs.msg.Int32(),
np.int64: std_msgs.msg.Int64(),
np.str: std_msgs.msg.String(),
np.uint16: std_msgs.msg.UInt16(),
np.uint32: std_msgs.msg.UInt32(),
np.uint64: std_msgs.msg.UInt64(),
np.uint8: std_msgs.msg.UInt8(),
}[dtype]
else:
msg_type = (
dtype # allow users to use their own custom messages in numpy arrays
)
return msg_type

publishers: Dict[str, rospy.Publisher] = {}
rospy.init_node(node_name, anonymous=True)
try:
for v in read_updates(vids, callbacks, fps_limit, size, end_callback, blocking):
if v:
if rospy.is_shutdown():
break
for vid_name, frame in v.items():
if vid_name not in publishers:
dty = frame.dtype if dtype is None else dtype
publishers[vid_name] = rospy.Publisher(
publisher_name + vid_name,
numpy_msg(get_msg_type(dty)),
queue_size=10,
)
publishers[vid_name].publish(frame)
if rate_hz:
await asyncio.sleep(1.0 / rate_hz)
else:
await asyncio.sleep(0)
except KeyboardInterrupt:
pass
finally:
vid_names = [uid_for_source(name) for name in vids]
for v in vid_names:
subscriber_dictionary.stop_cam(v)
if rospy.core.is_shutdown():
raise rospy.exceptions.ROSInterruptException("rospy shutdown")
10 changes: 5 additions & 5 deletions displayarray/frame/frame_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(
video_source: Union[int, str, np.ndarray] = 0,
callbacks: Optional[Union[List[FrameCallable], FrameCallable]] = None,
request_size: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
mjpg: bool = True,
fps_limit: float = float("inf"),
force_backend="",
):
Expand All @@ -40,7 +40,7 @@ def __init__(
else:
self.callbacks = callbacks
self.request_size = request_size
self.high_speed = high_speed
self.mjpg = mjpg
self.fps_limit = fps_limit
self.exception_raised = None
self.force_backend = force_backend
Expand Down Expand Up @@ -88,7 +88,7 @@ def loop(self):
t = pub_cam_thread(
self.video_source,
self.request_size,
self.high_speed,
self.mjpg,
self.fps_limit,
self.force_backend,
)
Expand Down Expand Up @@ -266,12 +266,12 @@ async def read_updates_ros(
{
np.float32: std_msgs.msg.Float32(),
np.float64: std_msgs.msg.Float64(),
np.bool: std_msgs.msg.Bool(),
np.bool: std_msgs.msg.Bool(), # type: ignore
np.char: std_msgs.msg.Char(),
np.int16: std_msgs.msg.Int16(),
np.int32: std_msgs.msg.Int32(),
np.int64: std_msgs.msg.Int64(),
np.str: std_msgs.msg.String(),
np.str: std_msgs.msg.String(), # type: ignore
np.uint16: std_msgs.msg.UInt16(),
np.uint32: std_msgs.msg.UInt32(),
np.uint64: std_msgs.msg.UInt64(),
Expand Down
49 changes: 49 additions & 0 deletions displayarray/frame/zmq_to_opencv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Allow OpenCV to handle zmq subscriber addresses as input."""

import cv2
import zmq
from tensorcom.tenbin import decode_buffer # type: ignore


class ZmqCam(object):
"""Add OpenCV camera controls to a numpy array."""

def __init__(self, img):
"""Create a fake camera for OpenCV based on the initial array."""
assert isinstance(img, str)
s = img.split('#')
self.__ctx = zmq.Context()
self.__addr = s[0]
self.__sub = self.__ctx.socket(zmq.SUB)
if len(s) > 1:
self.__topic = bytes(s[1], 'ascii')
self.__sub.setsockopt(zmq.SUBSCRIBE, self.__topic)
else:
self.__topic = b""
self.__sub.connect(self.__addr)

self.__is_opened = True

def set(self, *args, **kwargs):
"""Set CAP_PROP_FRAME_WIDTH or CAP_PROP_FRAME_HEIGHT to scale a numpy array to that size."""
pass

@staticmethod
def get(*args, **kwargs):
"""Get OpenCV args. Currently only a fake CAP_PROP_FRAME_COUNT to fix detecting video ends."""
if args[0] == cv2.CAP_PROP_FRAME_COUNT:
return float("inf")

def read(self):
"""Read back the numpy array in standard "did it work", "the array", OpenCV format."""
r = self.__sub.recv_multipart()
arrs = [decode_buffer(ri) for ri in r[1:]]
return True, arrs

def isOpened(self): # NOSONAR
"""Hack to tell OpenCV we're opened until we call release."""
return self.__is_opened

def release(self):
"""Let OpenCV know we're finished."""
self.__is_opened = False
Loading