#!/usr/bin/env python3 #from ctypes import * import ctypes import _ctypes from datetime import datetime import os import json import random import plotly.graph_objects as go import networkx as nx import dash import dash_cytoscape as cyto from dash import Dash, dcc, html from dash.dependencies import Input, Output import dash_bootstrap_components as dbc cyto.load_extra_layouts() external_script = ["https://tailwindcss.com/", {"src": "https://cdn.tailwindcss.com"}] class LogData(ctypes.Structure): _fields_ = [ ("tag", ctypes.c_char_p), ("iface_in", ctypes.c_char_p), ("iface_out", ctypes.c_char_p), ("mac", ctypes.c_char_p), ("dst_ip", ctypes.c_char_p), ("src_ip", ctypes.c_char_p), ("dst_port", ctypes.c_char_p), ("src_port", ctypes.c_char_p), ("proto", ctypes.c_char_p), ("tstamp", ctypes.c_char_p), ("len", ctypes.c_char_p) ] def c_parser(log_line): so_file = "lib/parser_lib.so" iptablesParser = CDLL(so_file) iptablesParser.iptablesParser.argtype = c_char_p iptablesParser.iptablesParser.restype = c_char_p iptablesParser.lineParser.argtype = c_char_p iptablesParser.lineParser.restype = c_char_p parser_arg = log_line.encode('utf-8') # c_return = iptablesParser.iptablesParser(parser_arg) c_return = iptablesParser.lineParser(parser_arg) _ctypes.dlclose(iptablesParser._handle) # iptablesParser.freeme(c_return) print() print("[ Return on Python ]"+"-"*50+"[+]") print(c_return.decode("utf-8")) print(c_return) def file_pointer(): f = open("/var/log/iptables.log", "r") i = 0 for x in f: print() print("*"*100) print("SEQUENCE : ",i) print("*"*100) print(str(i)+" -> "+x) c_parser(str(x)) if i >= 3: break i = i + 1 def struct_process(): path = os.getcwd() clibrary = ctypes.CDLL(os.path.join(path, 'lib/parser_lib.so')) #param_1=("ABC", "CDE") clibrary.main.restype = ctypes.POINTER(LogData) call_lib = clibrary.main() print(call_lib.contents.src_ip.decode('utf-8')) print(call_lib.contents.dst_ip.decode('utf-8')) print(call_lib.contents.src_port.decode('utf-8')) print(call_lib.contents.dst_port.decode('utf-8')) print(call_lib.contents.proto.decode('utf-8')) print(call_lib.contents.iface_in.decode('utf-8')) print(call_lib.contents.iface_out.decode('utf-8')) print(call_lib.contents.len) def line_process(): path = os.getcwd() log_file = "wg_api/iptable_parser/example/iptables.log" p_file = open(os.path.join(path, log_file)) p_lines = p_file.readlines() clibrary = ctypes.CDLL(os.path.join(path, 'wg_api/iptable_parser/lib/parser_lib.so')) clibrary.main.restype = ctypes.POINTER(LogData) clibrary.line_parse.restype = ctypes.POINTER(LogData) clibrary.line_parse.argtype = ctypes.c_char_p test_val = "HERRROOOO" json_dump = [] nodes = [] edges = [] with open(os.path.join(path, log_file)) as p_lines: for line in p_lines: if line.strip(): #print(line) parser_arg = line.encode('utf-8') call_lib = clibrary.line_parse(parser_arg) time_hr = datetime.fromisoformat(call_lib.contents.tstamp.decode('utf-8')) time_hr = time_hr.strftime("%d-%m-%Y %H:%M:%S (%Z)") # print("-"*50) # print("TSTAMP ",time_hr) # print("TAG ",call_lib.contents.tag.decode('utf-8')) # print("SRC ",call_lib.contents.src_ip.decode('utf-8')) # print("DST ",call_lib.contents.dst_ip.decode('utf-8')) # print("LEN ",call_lib.contents.len.decode('utf-8')) # print("IFACE_IN ",call_lib.contents.iface_in.decode('utf-8')) # print("IFACE_OUT ",call_lib.contents.iface_out.decode('utf-8')) # print("PROTO ",call_lib.contents.proto.decode('utf-8')) # if (call_lib.contents.proto != b"ICMP"): # print("SPT ",call_lib.contents.src_port.decode('utf-8')) # print("DPT ",call_lib.contents.dst_port.decode('utf-8')) # print() timestamp = call_lib.contents.tstamp.decode('utf-8') tag = call_lib.contents.tag.decode('utf-8') src_ip = call_lib.contents.src_ip.decode('utf-8') dst_ip = call_lib.contents.dst_ip.decode('utf-8') iface_in = call_lib.contents.iface_in.decode('utf-8') iface_out = call_lib.contents.iface_out.decode('utf-8') proto = call_lib.contents.proto.decode('utf-8') if (call_lib.contents.proto != b"ICMP"): src_port = call_lib.contents.src_port.decode('utf-8') dst_port = call_lib.contents.dst_port.decode('utf-8') else: src_port = "" dst_port = "" tmp_data = { "TimeStamp": timestamp, "Tag": tag, "SourceIP": src_ip, "DestinationIP": dst_ip, "InterafceIN": iface_in, "InterafceOUT": iface_out, "SourcePort": src_port, "DestinationPort": dst_port, "Protocol": proto } if (tag != "VPN-REG"): nodes.append({'id':src_ip, 'label': src_ip}) nodes.append({'id':dst_ip, 'label': dst_ip}) edges.append({'id': src_ip+"-"+dst_ip, 'source':src_ip, 'target': dst_ip, 'label': src_port+"->"+dst_port if proto != "ICMP" else "ICMP"}) json_dump.append(tmp_data) nodes = get_uniq(nodes) edges = get_uniq(edges) #plot_group_graph(nodes, edges) json_data = json.dumps(json_dump, indent=2) return nodes, edges def get_uniq(list): id_list = [] uniq_list = [] for x in list: if (x not in uniq_list) and (x['id'] not in id_list): id_list.append(x['id']) uniq_list.append(x) print("-"*50) for x in uniq_list: print(x) print("-"*50) return uniq_list #_ctypes.dlclose(call_lib._handle) # clibrary.main(param_1) # print(clibrary.main().contents.src_ip) # print(clibrary.main().contents.dst_ip) # file_pointer() # struct_process() def plot_group_graph(): nodes, edges = line_process() # Create a graph G = nx.Graph() G.add_nodes_from(nodes) G.add_edges_from(edges) # Generate random words for node hover data random_words = [random.choice(["Apple", "Banana", "Cherry", "Date", "Fig", "Grape", "Lemon", "Mango", "Orange", "Peach"]) for _ in nodes] # Define node colors based on conditions node_name = {node: "Johan:"+" "+node if node == "192.168.101.113" else node for node in nodes } node_colors = {node: 'turqoise' if node == '192.168.20.2' else ('teal' if node.startswith('192.168') else ('orange' if node.startswith('10.30.') else 'beige')) for node in nodes} # Create a Dash app #app = dash.Dash(__name__) #app = DjangoDash(name='dash_app', external_stylesheets=[dbc.themes.LUX]) #layout={'name': 'cose', 'avoidOverlap': True, 'nodeDimensionsIncludeLabels': True}, layout={ "name": "cose", "idealEdgeLength": 20, "nodeOverlap": 20, "refresh": 30, "fit": True, "padding": 3, "randomize": False, "componentSpacing": 100, "nodeRepulsion": 400000, "edgeElasticity": 100, "nestingFactor": 5, "gravity": 80, "numIter": 1000, "initialTemp": 200, "coolingFactor": 0.95, "minTemp": 1.0, }, app = dash.Dash( __name__, external_scripts=external_script, external_stylesheets=[dbc.themes.FLATLY] ) app.scripts.config.serve_locally = True #positions = [ {"x":random.randint(1000,5000), "y":random.randint(1,100)} for _ in nodes ] elements=[ {'data': {'id': node, 'label': node_name[node], 'random_word': random_word, 'color': node_colors[node]}} for node, random_word in zip(nodes, random_words) ] + [{'data': {'source': source, 'target': target, 'color':'crimson'}} for source, target in edges], # Include the edges stylesheet=[ { 'selector': 'node', 'style': { 'background-color': 'data(color)', "width": "mapData(weight, 40, 80, 20, 60)", 'label': 'data(label)', 'font-size': '12px', 'text-halign': 'center', 'text-valign': 'top', # Adjust the text position to 'top' 'border-width': '2px', 'border-color': 'seagreen', 'shadow-box': '2px', 'content': 'data(label)', } }, { 'selector': 'edge', 'style': { 'width': 2, 'background-opacity': 0.2, # Set the opacity for the parent node 'line-color': 'teal', } }, { 'selector': 'edge:selected', 'style': { 'border': 'crimson', 'border-opacity': 0.2, 'width': 4, 'background-opacity': 0.2, # Set the opacity for the parent node 'line-color': 'crimson', } }, { 'selector': ':parent', 'style': { 'background-opacity': 0.2, # Set the opacity for the parent node } }, { 'selector': 'node:parent', 'style': { 'border-width': 1, # Set the border width for the parent node } }, { 'selector': ':selected', 'style': { 'border-width': 4, # Set the border width for selected nodes 'border-color': 'crimson', # Set the border color for selected nodes } }, ], # Define the app layout app.layout = html.Div([ html.Div(children=[ html.Div(children=[ dcc.Tabs([ dcc.Tab(label='Tab 1',children=[ html.Div(children=[ html.Div(f'Node Details: ', className='p-2'), dcc.Slider(id='slider', min=1, max=500, step=20), html.Button('Reset', id='bt-reset', className='btn bg-teal-500 p-2 text-white hover:bg-teal-600'), dcc.Dropdown(['cose', 'cola', 'random', 'circle'], 'cose', id='graph-mode-dropdown'), dcc.Input( id="input_{}".format("text"), type="text", placeholder="input type {}" ), html.Div(id='out-all-types', className=''), ], className='p-2 space-y-2'), ]), dcc.Tab(label='Tab 2', children=[ html.Div(children=[ html.Div(f'Edge Details: '), html.Div(id='hover-data', className=''), ], className='space-y-2 p-2'), ]), ]), ], className="w-96 flex-none"), html.Div(children=[ dcc.Tabs([ dcc.Tab(label='OAM',children=[ html.Div(children=[ cyto.Cytoscape( id='network-graph', layout=layout[0], zoomingEnabled=True, zoom=0.9, responsive=True, elements=elements[0], style={'width': '100%', 'height': '88vh', 'background':'#dfdfdf'}, stylesheet=stylesheet[0], ) ], className="bg-amber-50 p-4 flex"), ]), dcc.Tab(label='VPN Registration',children=[ html.Div(children=[ cyto.Cytoscape( id='network-graph', layout=layout[0], zoomingEnabled=True, zoom=0.9, responsive=True, elements=elements[0], style={'width': '100%', 'height': '88vh', 'background':'#efefef'}, stylesheet=stylesheet[0], ) ], className="bg-amber-50 p-4"), ]), ]), ], className="w-[calc(100%-24rem)] h-[calc(100%-16rem)]"), ], className="flex w-full h-full"), ]) @app.callback( [Output('network-graph', 'zoom'), Output('network-graph', 'elements')], [Input('bt-reset', 'n_clicks')] ) def reset_layout(n_clicks): print(n_clicks, 'click') return [0.5, elements[0]] @app.callback( Output('network-graph', 'layout'), [Input('graph-mode-dropdown', 'value')] ) def graph_mode(value): print(value) tmp_layout = layout[0] tmp_layout['name'] = value return tmp_layout # @app.callback( # Output('network-graph', 'layout'), # [Input('slider', 'value')] # ) # def update_zoom(value): # print(value) # tmp_layout = layout[0] # tmp_layout['idealEdgeLength'] = value # return tmp_layout @app.callback( Output('hover-data', 'children'), Input('network-graph', 'tapNodeData') ) def display_hover_data(data): if data: return html.Div([ html.Div(f'Node: {data["id"]}', style={'font-weight': 'bold', 'background':'#dfdfdf'}), html.Div(f'Random Word: {data["random_word"]}') ]) else: return '' @app.callback( Output("out-all-types", "children"), Input("input_{}".format('text'), "value"), ) def cb_render(*vals): return html.Div([ html.Div(f'Input: {vals}', style={'font-weight': 'bold', 'background':'#dfdfdf'}), ]) if __name__ == '__main__': app.run_server(debug=True, use_reloader=True) def plot_net_graph(nodes, edges): # Create a graph G = nx.Graph() G.add_nodes_from(nodes) G.add_edges_from(edges) # Generate random words for node hover data random_words = [random.choice(["Apple", "Banana", "Cherry", "Date", "Fig", "Grape", "Lemon", "Mango", "Orange", "Peach"]) for _ in nodes] # Create positions for the nodes using a circular layout pos = nx.circular_layout(G, scale=2) # Create a Plotly figure fig = go.Figure() # Add edges to the figure for edge in edges: x0, y0 = pos[edge[0]] x1, y1 = pos[edge[1]] edge_trace = go.Scatter( x=[x0, x1, None], y=[y0, y1, None], mode='lines', line=dict(width=2, color='gray'), # Set edge color to 'gray' and adjust width hoverinfo='none', # Disable edge hover ) fig.add_trace(edge_trace) # Define node colors based on conditions node_colors = [] for node in nodes: if node == '192.168.20.2': node_colors.append('#22C55E') # Green color elif node.startswith('10.30.1'): node_colors.append('seagreen') # Blue color elif node.startswith('192.168.100') or node.startswith('192.168.101'): node_colors.append('#3B82F6') # Blue color else: node_colors.append('#EF4444') # Red color node_size = [] for node in nodes: if node == '192.168.20.2': node_size.append(30) # Green color elif node.startswith('10.30.1'): node_size.append(25) # Blue color elif node.startswith('192.168.100') or node.startswith('192.168.101'): node_size.append(25) # Blue color else: node_size.append(20) # Red color # Add nodes to the figure with defined colors for node, word, color, size in zip(nodes, random_words, node_colors, node_size): x, y = pos[node] is_192_168 = node.startswith('192.168') node_trace = go.Scatter( x=[x], y=[y], mode='markers', marker=dict( size=size, color=color, # Set color based on the conditions line=dict( width=2, color='black' # Set border color ) ), text=f"Node: {node}
Random Word: {word}", hoverinfo='text' # Display node and random word on hover ) fig.add_trace(node_trace) # Customize the layout fig.update_layout( showlegend=False, title='Denser Network Graph with Larger Circled Nodes', hovermode='closest', # Set hover mode to the nodes only xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), # Remove x-axis values yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), # Remove y-axis values paper_bgcolor='lightgray', # Set a milder background color ) # Set the aspect ratio to make the graph denser fig.update_xaxes(scaleanchor="y", scaleratio=1) # Display the plot app = Dash() app.layout = html.Div([ dcc.Graph( id='my-graph', figure=fig, style = {'display': 'inline-block', 'height': '90vh', 'width': '100%'} ) ]) app.run_server(debug=True, use_reloader=True) # Turn off reloader if inside Jupyter #fig.show() def plot_graph(in_nodes, in_edges): #G = nx.random_geometric_graph(10, 0.25) G = nx.Graph() G.add_nodes_from(in_nodes) G.add_edges_from(in_edges) print("*"*50) print(G.edges()) print(G.nodes()) print("*"*50) edge_x = [] edge_y = [] for edge in G.edges(): print(G.nodes[edge[0]]['pos']) print(G.nodes[edge[1]]['pos']) x0, y0 = G.nodes[edge[0]]['pos'] x1, y1 = G.nodes[edge[1]]['pos'] edge_x.append(x0) edge_x.append(x1) edge_x.append(None) edge_y.append(y0) edge_y.append(y1) edge_y.append(None) edge_trace = go.Scatter( x=edge_x, y=edge_y, line=dict(width=0.5, color='#888'), hoverinfo='none', mode='lines') node_x = [] node_y = [] for node in G.nodes(): x, y = G.nodes[node]['pos'] node_x.append(x) node_y.append(y) node_trace = go.Scatter( x=node_x, y=node_y, mode='markers', hoverinfo='text', marker=dict( showscale=True, # colorscale options #'Greys' | 'YlGnBu' | 'Greens' | 'YlOrRd' | 'Bluered' | 'RdBu' | #'Reds' | 'Blues' | 'Picnic' | 'Rainbow' | 'Portland' | 'Jet' | #'Hot' | 'Blackbody' | 'Earth' | 'Electric' | 'Viridis' | colorscale='YlGnBu', reversescale=True, color=[], size=10, colorbar=dict( thickness=15, title='Node Connections', xanchor='left', titleside='right' ), line_width=2)) node_adjacencies = [] node_text = [] for node, adjacencies in enumerate(G.adjacency()): node_adjacencies.append(len(adjacencies[1])) node_text.append('# of connections: '+str(len(adjacencies[1]))) node_trace.marker.color = node_adjacencies node_trace.text = node_text fig = go.Figure(data=[edge_trace, node_trace], layout=go.Layout( title='Network graph made with Python', titlefont_size=16, showlegend=False, hovermode='closest', margin=dict(b=20,l=5,r=5,t=40), annotations=[ dict( text="Python code: https://plotly.com/ipython-notebooks/network-graphs/", showarrow=False, xref="paper", yref="paper", x=0.005, y=-0.002 ) ], xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), yaxis=dict(showgrid=False, zeroline=False, showticklabels=False)) ) fig.show() #plot_group_graph()