Skip to content

Experimental Datatypes

A list of experimental multi-modal datatypes that can be passed into a WandB loggable dictionary.

InteractiveVideo

Bases: Html

Format a video such that it is logged in an interactive format with controls to contrast the default uncontrollable gif offered by wandb.Video.

import wandb
from wandb_addons.datatype import loggable_video

with wandb.init(project="test", entity="geekyrakshit"):
    wandb.log({"Test-Video": InteractiveVideo("video.mp4")})
import numpy as np

import wandb
from wandb_addons.datatype import loggable_video

with wandb.init(project="test", entity="geekyrakshit"):
    frames = [np.ones((256, 256, 3)) * 255] * 10 + [np.zeros((256, 256, 3))] * 10
    wandb.log({"Test-Video": InteractiveVideo(frames)})

Parameters:

Name Type Description Default
video Union[str, List[array]]

The path to a video file or a list of numpy arrays of shape (H, W, C) corresponding to the frames of the video.

required
video_format str

Format of the video.

'mp4'
fps int

Frame-rate of the video, applicable only when logging list of numpy arrays.

30

Returns:

Type Description
Union[Html, None]

A wandb.Html object that can be passed to a WandB loggable dictionary.

Source code in wandb_addons/datatype/video.py
class InteractiveVideo(wandb.Html):
    """Format a video such that it is logged in an interactive format with controls
    to contrast the default uncontrollable gif offered by
    [`wandb.Video`](https://docs.wandb.ai/ref/python/data-types/video).

    !!! example "Example WandB Run"
        [https://wandb.ai/geekyrakshit/test/runs/vi00rpc5](https://wandb.ai/geekyrakshit/test/runs/vi00rpc5)

    === "Logging a video file"

        ```python
        import wandb
        from wandb_addons.datatype import loggable_video

        with wandb.init(project="test", entity="geekyrakshit"):
            wandb.log({"Test-Video": InteractiveVideo("video.mp4")})
        ```

    === "Logging a list of numpy arrays corresponding to frames"

        ```python
        import numpy as np

        import wandb
        from wandb_addons.datatype import loggable_video

        with wandb.init(project="test", entity="geekyrakshit"):
            frames = [np.ones((256, 256, 3)) * 255] * 10 + [np.zeros((256, 256, 3))] * 10
            wandb.log({"Test-Video": InteractiveVideo(frames)})
        ```

    Arguments:
        video (Union[str, List[np.array]]): The path to a video file or a list of
            numpy arrays of shape `(H, W, C)` corresponding to the frames of the video.
        video_format (str): Format of the video.
        fps (int): Frame-rate of the video, applicable only when logging list of
            numpy arrays.

    Returns:
        (Union[wandb.Html, None]): A `wandb.Html` object that can be passed to a WandB
            loggable dictionary.
    """

    def __init__(
        self,
        video: Union[str, List[np.array]],
        video_format: str = "mp4",
        fps: int = 30,
    ) -> None:
        if isinstance(video, str):
            if os.path.isfile(video):
                with open(video, "rb") as video_file:
                    encoded_string = base64.b64encode(video_file.read()).decode("utf-8")
        elif isinstance(video, list):
            encoded_string = create_video_from_np_arrays(video, fps)
        else:
            wandb.termwarn("Unable to log video", repeat=False)
            return
        html_string = HTML_VIDEO_FORMAT.format(
            encoded_string=encoded_string, format=video_format
        )
        super().__init__(html_string, inject=True)

RGBDPointCloud

Bases: Object3D

Format an RGB image and a depthmap such that it is logged as an interactive 3d point cloud.

Logging an RGB Image and a Depthmap as a Point Cloud

from PIL import Image

import wandb
from wandb_addons.datatype import RGBDPointCloud

with wandb.init(project="test"):
    rgb_image = Image.open("./docs/assets/sample_image.jpg")
    depth_image = Image.open("./docs/assets/sample_depth.png")
    wandb.log({"Test-RGBD": RGBDPointCloud(rgb_image, depth_image)})

Parameters:

Name Type Description Default
rgb_image Union[str, Image, array]

The RGB image. Either a path to an image file, or a PIL Image, or a numpy array can be passed.

required
depth_image Union[str, Image, array]

The Depthmap. Either a path to an image file, or a PIL Image, or a numpy array can be passed.

required
camera_intrinsic_parameters Dict[str, float]

The camera intrinsic parameters as a dictionary. Defaults to o3d.camera.PinholeCameraIntrinsicParameters.PrimeSenseDefault if not specified.

None
Source code in wandb_addons/datatype/rgbd.py
class RGBDPointCloud(wandb.Object3D):
    """Format an RGB image and a depthmap such that it is logged as an interactive 3d point cloud.

    !!! example "Example WandB Run"
        [https://wandb.ai/geekyrakshit/test/runs/8ftwuuwf](https://wandb.ai/geekyrakshit/test/runs/8ftwuuwf)

    !!! example "Logging an RGB Image and a Depthmap as a Point Cloud"
        ```python
        from PIL import Image

        import wandb
        from wandb_addons.datatype import RGBDPointCloud

        with wandb.init(project="test"):
            rgb_image = Image.open("./docs/assets/sample_image.jpg")
            depth_image = Image.open("./docs/assets/sample_depth.png")
            wandb.log({"Test-RGBD": RGBDPointCloud(rgb_image, depth_image)})
        ```

    Arguments:
        rgb_image (Union[str, Image.Image, np.array]): The RGB image. Either a path to
            an image file, or a PIL Image, or a numpy array can be passed.
        depth_image (Union[str, Image.Image, np.array]): The Depthmap. Either a path to
            an image file, or a PIL Image, or a numpy array can be passed.
        camera_intrinsic_parameters (Dict[str, float]): The camera intrinsic parameters
            as a dictionary. Defaults to
            `o3d.camera.PinholeCameraIntrinsicParameters.PrimeSenseDefault` if not
            specified.
    """

    def __init__(
        self,
        rgb_image: Union[str, Image.Image, np.array],
        depth_image: Union[str, Image.Image, np.array],
        camera_intrinsic_parameters: Dict[str, float] = None,
        **kwargs,
    ) -> None:
        self.camera_intrinsic_parameters = (
            o3d.camera.PinholeCameraIntrinsicParameters.PrimeSenseDefault
            if camera_intrinsic_parameters is None
            else camera_intrinsic_parameters
        )
        rgb_image_numpy, point_cloud = self.create_point_cloud(rgb_image, depth_image)
        normalized_point_cloud = self.normalize_point_cloud(point_cloud)
        colored_point_cloud = self.get_colored_point_cloud(
            rgb_image_numpy, normalized_point_cloud
        )
        super().__init__(colored_point_cloud, **kwargs)

    def _get_images_as_numpy_arrays(
        self,
        rgb_image: Union[str, Image.Image, np.array],
        depth_image: Union[str, Image.Image, np.array],
    ):
        if isinstance(rgb_image, str) and os.path.isfile(rgb_image):
            rgb_image = Image.open(rgb_image)
        if isinstance(depth_image, str) and os.path.isfile(depth_image):
            depth_image = Image.open(depth_image)
        if isinstance(rgb_image, Image.Image):
            rgb_image = np.array(rgb_image)
        if isinstance(depth_image, Image.Image):
            depth_image = np.array(depth_image)
        assert (
            len(rgb_image.shape) == 3
        ), "Batched pair of RGB images and Depthmaps are not yet supported"
        assert rgb_image.shape[-1] == 3, "RGB image must have 3 channels"
        return rgb_image, depth_image

    def create_point_cloud(
        self,
        rgb_image: Union[str, Image.Image, np.array],
        depth_image: Union[str, Image.Image, np.array],
    ):
        rgb_image, depth_image = self._get_images_as_numpy_arrays(
            rgb_image, depth_image
        )
        rgb_image_numpy = rgb_image
        rgb_image = o3d.geometry.Image(rgb_image)
        depth_image = o3d.geometry.Image(depth_image)
        rgbd_image = o3d.geometry.RGBDImage.create_from_color_and_depth(
            rgb_image, depth_image, convert_rgb_to_intensity=False
        )
        camera_intrinsic = o3d.camera.PinholeCameraIntrinsic(
            self.camera_intrinsic_parameters
        )
        point_cloud = o3d.geometry.PointCloud.create_from_rgbd_image(
            rgbd_image, camera_intrinsic
        )
        point_cloud.transform(
            [[1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]]
        )
        return rgb_image_numpy, np.asarray(point_cloud.points)

    def normalize_point_cloud(self, point_cloud):
        min_values = np.min(point_cloud, axis=0)
        max_values = np.max(point_cloud, axis=0)
        range_values = max_values - min_values
        normalized_point_cloud = (point_cloud - min_values) / range_values
        return normalized_point_cloud

    def get_colored_point_cloud(self, rgb_image_numpy, normalized_point_cloud):
        rgb_image_numpy = rgb_image_numpy.reshape(-1, 3)
        colored_point_cloud = np.concatenate(
            (normalized_point_cloud, rgb_image_numpy), axis=-1
        )
        return colored_point_cloud

VoxelizedPointCloud

Bases: Object3D

Voxelizes a high-resolution point-cloud and format as a wandb-loggable 3D mesh.

Example WandB Run

Logging the voxelized mesh of this point cloud takes up a memort of ~29 MB of space on the wandb run, whereas logging the raw point cloud takes up ~700 MB of space.

https://wandb.ai/geekyrakshit/test/runs/w2nrw85q

import wandb
from wandb_addons.datatype import VoxelizedPointCloud

with wandb.init(project="test"):
    wandb.log(
        {"Test-Point-Cloud": VoxelizedPointCloud("2021_heerlen_table.las")}
    )
import laspy as lp
import numpy as np

import wandb
from wandb_addons.datatype import VoxelizedPointCloud

point_cloud = lp.read("2021_heerlen_table.las")
numpy_point_cloud = np.vstack((point_cloud.x, point_cloud.y, point_cloud.z)).transpose()
numpy_color_cloud = (
    np.vstack((point_cloud.red, point_cloud.green, point_cloud.blue)).transpose()
    / 65535
)

with wandb.init(project="test"):
    wandb.log(
        {"Test-Point-Cloud": VoxelizedPointCloud(numpy_point_cloud, numpy_color_cloud)}
    )

Parameters:

Name Type Description Default
point_cloud Optional[Union[str, np.array]]

The point cloud. Either a path to a laspy file, or a numpy array of shape (N, 3) where N is the number of points.

None
colors Optional[array]

The colors of the point cloud. A numpy array of shape (N, 3) consisting of color values corresponsing to the point cloud in the range [0, 1]. This is only necessary to be specified for point clouds as numpy arrays.

None
voxel_size_percentage Optional[float]

The size of each voxel as a percentage of the maximum edge of the point cloud.

0.5
voxel_precision Optional[int]

The precision of the voxel size.

4
Source code in wandb_addons/datatype/voxelized_pcd.py
class VoxelizedPointCloud(wandb.Object3D):
    """Voxelizes a high-resolution point-cloud and format as a wandb-loggable 3D mesh.

    !!! example "Example WandB Run"
        Logging the voxelized mesh of
        [this point cloud](https://drive.google.com/file/d/1Zr1y8BSYRHBKxvs_nUXo2LSQr2i5ulgj/view?usp=sharing)
        takes up a memort of ~29 MB of space on the wandb run, whereas logging the raw
        point cloud takes up ~700 MB of space.

        [https://wandb.ai/geekyrakshit/test/runs/w2nrw85q](https://wandb.ai/geekyrakshit/test/runs/w2nrw85q)

    === "From Laspy File"

        ```python
        import wandb
        from wandb_addons.datatype import VoxelizedPointCloud

        with wandb.init(project="test"):
            wandb.log(
                {"Test-Point-Cloud": VoxelizedPointCloud("2021_heerlen_table.las")}
            )
        ```

    === "From Numpy Array"

        ```python
        import laspy as lp
        import numpy as np

        import wandb
        from wandb_addons.datatype import VoxelizedPointCloud

        point_cloud = lp.read("2021_heerlen_table.las")
        numpy_point_cloud = np.vstack((point_cloud.x, point_cloud.y, point_cloud.z)).transpose()
        numpy_color_cloud = (
            np.vstack((point_cloud.red, point_cloud.green, point_cloud.blue)).transpose()
            / 65535
        )

        with wandb.init(project="test"):
            wandb.log(
                {"Test-Point-Cloud": VoxelizedPointCloud(numpy_point_cloud, numpy_color_cloud)}
            )

        ```

    !!! Info "Reference"
        [How to Automate Voxel Modelling of 3D Point Cloud with Python](https://towardsdatascience.com/how-to-automate-voxel-modelling-of-3d-point-cloud-with-python-459f4d43a227)

    Arguments:
        point_cloud ( Optional[Union[str, np.array]]): The point cloud. Either a path to
            a laspy file, or a numpy array of shape `(N, 3)` where `N` is the number of
            points.
        colors (Optional[np.array]): The colors of the point cloud. A numpy array of
            shape `(N, 3)` consisting of color values corresponsing to the point cloud
            in the range `[0, 1]`. This is only necessary to be specified for point
            clouds as numpy arrays.
        voxel_size_percentage (Optional[float]): The size of each voxel as a percentage
            of the maximum edge of the point cloud.
        voxel_precision (Optional[int]): The precision of the voxel size.
    """

    def __init__(
        self,
        point_cloud: Optional[Union[str, np.array]] = None,
        colors: Optional[np.array] = None,
        voxel_size_percentage: Optional[float] = 0.5,
        voxel_precision: Optional[int] = 4,
        **kwargs
    ) -> None:
        o3d_point_cloud = self.build_open3d_point_cloud(point_cloud, colors)
        voxel_mesh = self.voxelize_point_cloud(
            o3d_point_cloud, voxel_size_percentage, voxel_precision
        )
        voxel_file = os.path.join(wandb.run.dir, "voxelized_point_cloud.glb")
        o3d.io.write_triangle_mesh(voxel_file, voxel_mesh)
        super().__init__(open(voxel_file), **kwargs)

    def build_open3d_point_cloud(self, point_cloud, colors):
        if isinstance(point_cloud, str):
            point_cloud = lp.read(point_cloud)
            numpy_point_cloud = np.vstack(
                (point_cloud.x, point_cloud.y, point_cloud.z)
            ).transpose()
            numpy_color_cloud = (
                np.vstack(
                    (point_cloud.red, point_cloud.green, point_cloud.blue)
                ).transpose()
                / 65535
            )
            o3d_point_cloud = o3d.geometry.PointCloud()
            o3d_point_cloud.points = o3d.utility.Vector3dVector(numpy_point_cloud)
            o3d_point_cloud.colors = o3d.utility.Vector3dVector(numpy_color_cloud)
            return o3d_point_cloud
        elif isinstance(point_cloud, np.ndarray):
            o3d_point_cloud = o3d.geometry.PointCloud()
            o3d_point_cloud.points = o3d.utility.Vector3dVector(point_cloud)
            if colors is not None:
                o3d_point_cloud.colors = o3d.utility.Vector3dVector(colors)
            return o3d_point_cloud

    def voxelize_point_cloud(
        self, o3d_point_cloud, voxel_size_percentage, voxel_precision
    ):
        voxel_size = max(
            o3d_point_cloud.get_max_bound() - o3d_point_cloud.get_min_bound()
        ) * (voxel_size_percentage / 100.0)
        voxel_size = round(voxel_size, voxel_precision)
        voxel_grid = o3d.geometry.VoxelGrid.create_from_point_cloud(
            o3d_point_cloud, voxel_size=voxel_size
        )
        voxels = voxel_grid.get_voxels()
        voxel_mesh = o3d.geometry.TriangleMesh()
        for v in voxels:
            cube = o3d.geometry.TriangleMesh.create_box(width=1, height=1, depth=1)
            cube.paint_uniform_color(v.color)
            cube.translate(v.grid_index, relative=False)
            voxel_mesh += cube
        voxel_mesh.translate([0.5, 0.5, 0.5], relative=True)
        voxel_mesh.scale(voxel_size, [0, 0, 0])
        voxel_mesh.merge_close_vertices(1e-7)
        voxel_mesh.translate(voxel_grid.origin, relative=True)
        return voxel_mesh