Last active
May 26, 2021 01:32
-
-
Save filipinascimento/3f370354fe02e99b0ee1cb8b3389b20e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# import required libraries | |
import uvicorn | |
from vidgear.gears.asyncio import WebGear_RTC | |
# import necessary libs | |
import uvicorn, asyncio, cv2 | |
from av import VideoFrame | |
from aiortc import VideoStreamTrack | |
from vidgear.gears.asyncio import WebGear_RTC | |
from vidgear.gears.asyncio.helper import reducer | |
import numpy as np | |
import vtk | |
from vtk.util.numpy_support import vtk_to_numpy | |
""" | |
======================================================= | |
Visualize Interdisciplinary map of the journals network | |
======================================================= | |
The goal of this app is to show an overview of the journals network structure | |
as a complex network. Each journal is shown as a node and their connections | |
indicates a citation between two of them. | |
""" | |
############################################################################### | |
# First, let's import some useful functions | |
from os.path import join as pjoin | |
from fury import actor, window, colormap as cmap | |
import numpy as np | |
############################################################################### | |
# Then let's download some available datasets. | |
from fury.data.fetcher import fetch_viz_wiki_nw | |
files, folder = fetch_viz_wiki_nw() | |
categories_file, edges_file, positions_file = sorted(files.keys()) | |
############################################################################### | |
# We read our datasets | |
positions = np.loadtxt(pjoin(folder, positions_file)) | |
categories = np.loadtxt(pjoin(folder, categories_file), dtype=str) | |
edges = np.loadtxt(pjoin(folder, edges_file), dtype=int) | |
############################################################################### | |
# We attribute a color to each category of our dataset which correspond to our | |
# nodes colors. | |
category2index = {category: i | |
for i, category in enumerate(np.unique(categories))} | |
index2category = np.unique(categories) | |
categoryColors = cmap.distinguishable_colormap(nb_colors=len(index2category)) | |
colors = np.array([categoryColors[category2index[category]] | |
for category in categories]) | |
############################################################################### | |
# We define our node size | |
radii = 1 + np.random.rand(len(positions)) | |
############################################################################### | |
# Lets create our edges now. They will indicate a citation between two nodes. | |
# OF course, the colors of each edges will be an interpolation between the two | |
# node that it connects. | |
edgesPositions = [] | |
edgesColors = [] | |
for source, target in edges: | |
edgesPositions.append(np.array([positions[source], positions[target]])) | |
edgesColors.append(np.array([colors[source], colors[target]])) | |
edgesPositions = np.array(edgesPositions) | |
edgesColors = np.average(np.array(edgesColors), axis=1) | |
############################################################################### | |
# Our data preparation is ready, it is time to visualize them all. We start to | |
# build 2 actors that we represent our data : sphere_actor for the nodes and | |
# lines_actor for the edges. | |
sphere_actor = actor.sphere(centers=positions, | |
colors=colors, | |
radii=radii*0.5, | |
theta=8, | |
phi=8, | |
) | |
lines_actor = actor.line(edgesPositions, | |
colors=edgesColors, | |
opacity=0.1, | |
) | |
############################################################################### | |
# All actors need to be added in a scene, so we build one and add our | |
# lines_actor and sphere_actor. | |
scene = window.Scene() | |
scene.add(lines_actor) | |
scene.add(sphere_actor) | |
############################################################################### | |
# The final step ! Visualize and save the result of our creation! Please, | |
# switch interactive variable to True if you want to visualize it. | |
interactive = False | |
# if interactive: | |
# window.show(scene, size=(600, 600)) | |
# window.record(scene, out_path='journal_networks.png', size=(600, 600)) | |
# ############################################################################### | |
# # This example can be improved by adding some interactivy with slider, | |
# # picking, etc. Play with it, improve it! | |
# scene.window | |
# vtk_rw = vtk.vtkRenderWindow() | |
# vtk_win_im = vtk.vtkWindowToImageFilter() | |
# vtk_win_im.SetInput(vtk_rw) | |
# vtk_win_im.Update() | |
# vtk_image = vtk_win_im.GetOutput() | |
# width, height, _ = vtk_image.GetDimensions() | |
# vtk_array = vtk_image.GetPointData().GetScalars() | |
# components = vtk_array.GetNumberOfComponents() | |
# arr = vtk_to_numpy(vtk_array).reshape(height, width, components) | |
scene.set_camera(position=(0, 0, 1000), focal_point=(0.0, 0.0, 0.0), | |
view_up=(0.0, 0.0, 0.0)) | |
render_window = vtk.vtkRenderWindow() | |
render_window.SetOffScreenRendering(1) | |
imagesize = (1024,768) | |
render_window.AddRenderer(scene) | |
render_window.SetSize(imagesize[0], imagesize[1]) | |
window_to_image_filter = vtk.vtkWindowToImageFilter() | |
window_to_image_filter.SetInput(render_window) | |
# various performance tweaks | |
options = { | |
"frame_size_reduction": 25, | |
} | |
# initialize WebGear_RTC app without any source | |
web = WebGear_RTC(logging=True,**options) | |
# create your own Bare-Minimum Custom Media Server | |
class Custom_RTCServer(VideoStreamTrack): | |
""" | |
Custom Media Server using OpenCV, an inherit-class | |
to aiortc's VideoStreamTrack. | |
""" | |
def __init__(self, source=None): | |
# don't forget this line! | |
super().__init__() | |
self.image = np.random.randint(0,255,(imagesize[0], imagesize[1], 3), dtype=np.uint8) | |
self.ang = 0 | |
# initialize global params | |
async def recv(self): | |
""" | |
A coroutine function that yields `av.frame.Frame`. | |
""" | |
# don't forget this function!!! | |
# get next timestamp | |
pts, time_base = await self.next_timestamp() | |
# self.image = np.ascontiguousarray(window.snapshot(scene, fname=None, size=(imagesize[0], imagesize[1]), offscreen=True, | |
# order_transparent=False, stereo='off', | |
# multi_samples=0, max_peels=0, | |
# occlusion_ratio=0.0), dtype=np.uint8) | |
# scene.azimuth(1) | |
# render_window.Render() | |
# self.ang+=0.1 | |
scene.GetActiveCamera().Azimuth(2) | |
# render_window.Render() | |
# window_to_image_filter = vtk.vtkWindowToImageFilter() | |
window_to_image_filter.SetInput(render_window) | |
window_to_image_filter.Update() | |
window_to_image_filter.Modified() | |
vtk_image = window_to_image_filter.GetOutput() | |
h, w, _ = vtk_image.GetDimensions() | |
vtk_array = vtk_image.GetPointData().GetScalars() | |
components = vtk_array.GetNumberOfComponents() | |
self.image = vtk_to_numpy(vtk_array).reshape(w, h, components) | |
# self.image[:] = np.random.randint(0,255,(imagesize[0], imagesize[1], 3), dtype=np.uint8) | |
# reducer frames size if you want more performance otherwise comment this line | |
# frame = await reducer(frame, percentage=30) # reduce frame by 30% | |
# contruct `av.frame.Frame` from `numpy.nd.array` | |
# img = np.random.randint(0,255,(300, 300, 3), dtype=np.uint8) | |
av_frame = VideoFrame.from_ndarray(self.image) | |
av_frame.pts = pts | |
av_frame.time_base = time_base | |
# return `av.frame.Frame` | |
return av_frame | |
def terminate(self): | |
""" | |
Gracefully terminates VideoGear stream | |
""" | |
# don't forget this function!!! | |
# terminate | |
try: | |
if not (self.stream is None): | |
self.stream.release() | |
self.stream = None | |
except AttributeError: | |
pass | |
# assign your custom media server to config with adequate source (for e.g. foo.mp4) | |
web.config["server"] = Custom_RTCServer() | |
# run this app on Uvicorn server at address http://localhost:8000/ | |
uvicorn.run(web(), host="localhost", port=8000) | |
# close app safely | |
web.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment