iptables-parser/wrapper.py
2023-10-27 00:52:12 +07:00

492 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
#from ctypes import *
import ctypes
import _ctypes
from datetime import datetime
import os
import json
import random
import plotly.graph_objects as go
import networkx as nx
import dash
import dash_cytoscape as cyto
from dash import Dash, dcc, html
from dash.dependencies import Input, Output
cyto.load_extra_layouts()
external_script = ["https://tailwindcss.com/", {"src": "https://cdn.tailwindcss.com"}]
class LogData(ctypes.Structure):
_fields_ = [
("tag", ctypes.c_char_p),
("iface_in", ctypes.c_char_p),
("iface_out", ctypes.c_char_p),
("mac", ctypes.c_char_p),
("dst_ip", ctypes.c_char_p),
("src_ip", ctypes.c_char_p),
("dst_port", ctypes.c_char_p),
("src_port", ctypes.c_char_p),
("proto", ctypes.c_char_p),
("tstamp", ctypes.c_char_p),
("len", ctypes.c_char_p)
]
def c_parser(log_line):
so_file = "lib/parser_lib.so"
iptablesParser = CDLL(so_file)
iptablesParser.iptablesParser.argtype = c_char_p
iptablesParser.iptablesParser.restype = c_char_p
iptablesParser.lineParser.argtype = c_char_p
iptablesParser.lineParser.restype = c_char_p
parser_arg = log_line.encode('utf-8')
# c_return = iptablesParser.iptablesParser(parser_arg)
c_return = iptablesParser.lineParser(parser_arg)
_ctypes.dlclose(iptablesParser._handle)
# iptablesParser.freeme(c_return)
print()
print("[ Return on Python ]"+"-"*50+"[+]")
print(c_return.decode("utf-8"))
print(c_return)
def file_pointer():
f = open("/var/log/iptables.log", "r")
i = 0
for x in f:
print()
print("*"*100)
print("SEQUENCE : ",i)
print("*"*100)
print(str(i)+" -> "+x)
c_parser(str(x))
if i >= 3:
break
i = i + 1
def struct_process():
path = os.getcwd()
clibrary = ctypes.CDLL(os.path.join(path, 'lib/parser_lib.so'))
#param_1=("ABC", "CDE")
clibrary.main.restype = ctypes.POINTER(LogData)
call_lib = clibrary.main()
print(call_lib.contents.src_ip.decode('utf-8'))
print(call_lib.contents.dst_ip.decode('utf-8'))
print(call_lib.contents.src_port.decode('utf-8'))
print(call_lib.contents.dst_port.decode('utf-8'))
print(call_lib.contents.proto.decode('utf-8'))
print(call_lib.contents.iface_in.decode('utf-8'))
print(call_lib.contents.iface_out.decode('utf-8'))
print(call_lib.contents.len)
def line_process():
path = os.getcwd()
log_file = "example/iptables.log"
p_file = open(os.path.join(path, log_file))
p_lines = p_file.readlines()
clibrary = ctypes.CDLL(os.path.join(path, 'lib/parser_lib.so'))
clibrary.main.restype = ctypes.POINTER(LogData)
clibrary.line_parse.restype = ctypes.POINTER(LogData)
clibrary.line_parse.argtype = ctypes.c_char_p
test_val = "HERRROOOO"
json_dump = []
nodes = []
edges = []
with open(os.path.join(path, log_file)) as p_lines:
for line in p_lines:
#print(line)
parser_arg = line.encode('utf-8')
call_lib = clibrary.line_parse(parser_arg)
time_hr = datetime.fromisoformat(call_lib.contents.tstamp.decode('utf-8'))
time_hr = time_hr.strftime("%d-%m-%Y %H:%M:%S (%Z)")
# print("-"*50)
# print("TSTAMP ",time_hr)
# print("TAG ",call_lib.contents.tag.decode('utf-8'))
# print("SRC ",call_lib.contents.src_ip.decode('utf-8'))
# print("DST ",call_lib.contents.dst_ip.decode('utf-8'))
# print("LEN ",call_lib.contents.len.decode('utf-8'))
# print("IFACE_IN ",call_lib.contents.iface_in.decode('utf-8'))
# print("IFACE_OUT ",call_lib.contents.iface_out.decode('utf-8'))
# print("PROTO ",call_lib.contents.proto.decode('utf-8'))
# if (call_lib.contents.proto != b"ICMP"):
# print("SPT ",call_lib.contents.src_port.decode('utf-8'))
# print("DPT ",call_lib.contents.dst_port.decode('utf-8'))
# print()
timestamp = call_lib.contents.tstamp.decode('utf-8')
tag = call_lib.contents.tag.decode('utf-8')
src_ip = call_lib.contents.src_ip.decode('utf-8')
dst_ip = call_lib.contents.dst_ip.decode('utf-8')
iface_in = call_lib.contents.iface_in.decode('utf-8')
iface_out = call_lib.contents.iface_out.decode('utf-8')
proto = call_lib.contents.proto.decode('utf-8')
if (call_lib.contents.proto != b"ICMP"):
src_port = call_lib.contents.src_port.decode('utf-8')
dst_port = call_lib.contents.dst_port.decode('utf-8')
else:
src_port = ""
dst_port = ""
tmp_data = {
"TimeStamp": timestamp,
"Tag": tag,
"SourceIP": src_ip,
"DestinationIP": dst_ip,
"InterafceIN": iface_in,
"InterafceOUT": iface_out,
"SourcePort": src_port,
"DestinationPort": dst_port,
"Protocol": proto
}
nodes.append(src_ip)
nodes.append(dst_ip)
edges.append((src_ip, dst_ip))
json_dump.append(tmp_data)
nodes = get_uniq(nodes)
edges = get_uniq(edges)
plot_group_graph(nodes, edges)
json_data = json.dumps(json_dump, indent=2)
return json_data
def get_uniq(list):
uniq_list = []
for x in list:
if x not in uniq_list:
uniq_list.append(x)
print("-"*50)
for x in uniq_list:
print(x)
print("-"*50)
return uniq_list
#_ctypes.dlclose(call_lib._handle)
# clibrary.main(param_1)
# print(clibrary.main().contents.src_ip)
# print(clibrary.main().contents.dst_ip)
# file_pointer()
# struct_process()
def plot_group_graph(nodes, edges):
# Create a graph
G = nx.Graph()
G.add_nodes_from(nodes)
G.add_edges_from(edges)
# Generate random words for node hover data
random_words = [random.choice(["Apple", "Banana", "Cherry", "Date", "Fig", "Grape", "Lemon", "Mango", "Orange", "Peach"]) for _ in nodes]
# Define node colors based on conditions
node_colors = {node: 'red' if node not in ['192.168.20.2', '192.168.100.62', '192.168.100.28', '192.168.101.112'] else ('#176069' if node == '192.168.20.2' else 'blue') for node in nodes}
# Create a Dash app
#app = dash.Dash(__name__)
app = dash.Dash(
__name__,
external_scripts=external_script,
)
app.scripts.config.serve_locally = True
positions = [ {"x":random.randint(1,1000), "y":random.randint(1,30000)} for _ in nodes ]
# Define the app layout
app.layout = html.Div([
html.Div(children=[
html.Div(children=[
html.Div(f'Edge Details: ', style={'font-weight': 'bold', 'background':'#dfdfdf'}),
html.Div(id='hover-data', className='bg-gray-500'),
], className="w-96 flex-none bg-amber-500"),
html.Div([
cyto.Cytoscape(
id='network-graph',
layout={'name': 'cose', 'avoidOverlap': True, 'nodeDimensionsIncludeLabels': True},
elements=[
{'data': {'id': node, 'label': node, 'random_word': random_word, 'color': node_colors[node], 'position':pos}} for node, random_word, pos in zip(nodes, random_words, positions)
] + [{'data': {'source': source, 'target': target}} for source, target in edges], # Include the edges
style={'width': '100%', 'height': '94vh'},
stylesheet=[
{
'selector': 'node',
'style': {
'background-color': 'data(color)',
'label': 'data(label)',
'font-size': '12px',
'text-halign': 'center',
'text-valign': 'top', # Adjust the text position to 'top'
'border-width': '1px',
'border-color': 'gray',
'shadow-box': '2px',
'content': 'data(label)',
}
},
{
'selector': 'edge',
'style': {
'width': 2,
'line-color': 'gray',
}
},
{
'selector': ':parent',
'style': {
'background-opacity': 0.2, # Set the opacity for the parent node
}
},
{
'selector': 'node:parent',
'style': {
'border-width': 1, # Set the border width for the parent node
}
},
{
'selector': ':selected',
'style': {
'border-width': 2, # Set the border width for selected nodes
'border-color': 'aqua', # Set the border color for selected nodes
}
},
],
className="bg-surface-200 flex-none",
),
]),], className="flex w-full h-full"),
])
@app.callback(
Output('hover-data', 'children'),
Input('network-graph', 'mouseoverNodeData')
)
def display_hover_data(data):
if data:
return html.Div([
html.Div(f'Node: {data["id"]}', style={'font-weight': 'bold', 'background':'#dfdfdf'}),
html.Div(f'Random Word: {data["random_word"]}')
])
else:
return ''
if __name__ == '__main__':
app.run_server(debug=False)
def plot_net_graph(nodes, edges):
# Create a graph
G = nx.Graph()
G.add_nodes_from(nodes)
G.add_edges_from(edges)
# Generate random words for node hover data
random_words = [random.choice(["Apple", "Banana", "Cherry", "Date", "Fig", "Grape", "Lemon", "Mango", "Orange", "Peach"]) for _ in nodes]
# Create positions for the nodes using a circular layout
pos = nx.circular_layout(G, scale=2)
# Create a Plotly figure
fig = go.Figure()
# Add edges to the figure
for edge in edges:
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
edge_trace = go.Scatter(
x=[x0, x1, None],
y=[y0, y1, None],
mode='lines',
line=dict(width=2, color='gray'), # Set edge color to 'gray' and adjust width
hoverinfo='none', # Disable edge hover
)
fig.add_trace(edge_trace)
# Define node colors based on conditions
node_colors = []
for node in nodes:
if node == '192.168.20.2':
node_colors.append('#22C55E') # Green color
elif node.startswith('10.30.1'):
node_colors.append('seagreen') # Blue color
elif node.startswith('192.168.100') or node.startswith('192.168.101'):
node_colors.append('#3B82F6') # Blue color
else:
node_colors.append('#EF4444') # Red color
node_size = []
for node in nodes:
if node == '192.168.20.2':
node_size.append(30) # Green color
elif node.startswith('10.30.1'):
node_size.append(25) # Blue color
elif node.startswith('192.168.100') or node.startswith('192.168.101'):
node_size.append(25) # Blue color
else:
node_size.append(20) # Red color
# Add nodes to the figure with defined colors
for node, word, color, size in zip(nodes, random_words, node_colors, node_size):
x, y = pos[node]
is_192_168 = node.startswith('192.168')
node_trace = go.Scatter(
x=[x],
y=[y],
mode='markers',
marker=dict(
size=size,
color=color, # Set color based on the conditions
line=dict(
width=2,
color='black' # Set border color
)
),
text=f"Node: {node}<br>Random Word: {word}",
hoverinfo='text' # Display node and random word on hover
)
fig.add_trace(node_trace)
# Customize the layout
fig.update_layout(
showlegend=False,
title='Denser Network Graph with Larger Circled Nodes',
hovermode='closest', # Set hover mode to the nodes only
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), # Remove x-axis values
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), # Remove y-axis values
paper_bgcolor='lightgray', # Set a milder background color
)
# Set the aspect ratio to make the graph denser
fig.update_xaxes(scaleanchor="y", scaleratio=1)
# Display the plot
app = Dash()
app.layout = html.Div([
dcc.Graph(
id='my-graph',
figure=fig,
style = {'display': 'inline-block', 'height': '90vh', 'width': '100%'}
)
])
app.run_server(debug=True, use_reloader=True) # Turn off reloader if inside Jupyter
#fig.show()
def plot_graph(in_nodes, in_edges):
#G = nx.random_geometric_graph(10, 0.25)
G = nx.Graph()
G.add_nodes_from(in_nodes)
G.add_edges_from(in_edges)
print("*"*50)
print(G.edges())
print(G.nodes())
print("*"*50)
edge_x = []
edge_y = []
for edge in G.edges():
print(G.nodes[edge[0]]['pos'])
print(G.nodes[edge[1]]['pos'])
x0, y0 = G.nodes[edge[0]]['pos']
x1, y1 = G.nodes[edge[1]]['pos']
edge_x.append(x0)
edge_x.append(x1)
edge_x.append(None)
edge_y.append(y0)
edge_y.append(y1)
edge_y.append(None)
edge_trace = go.Scatter(
x=edge_x, y=edge_y,
line=dict(width=0.5, color='#888'),
hoverinfo='none',
mode='lines')
node_x = []
node_y = []
for node in G.nodes():
x, y = G.nodes[node]['pos']
node_x.append(x)
node_y.append(y)
node_trace = go.Scatter(
x=node_x, y=node_y,
mode='markers',
hoverinfo='text',
marker=dict(
showscale=True,
# colorscale options
#'Greys' | 'YlGnBu' | 'Greens' | 'YlOrRd' | 'Bluered' | 'RdBu' |
#'Reds' | 'Blues' | 'Picnic' | 'Rainbow' | 'Portland' | 'Jet' |
#'Hot' | 'Blackbody' | 'Earth' | 'Electric' | 'Viridis' |
colorscale='YlGnBu',
reversescale=True,
color=[],
size=10,
colorbar=dict(
thickness=15,
title='Node Connections',
xanchor='left',
titleside='right'
),
line_width=2))
node_adjacencies = []
node_text = []
for node, adjacencies in enumerate(G.adjacency()):
node_adjacencies.append(len(adjacencies[1]))
node_text.append('# of connections: '+str(len(adjacencies[1])))
node_trace.marker.color = node_adjacencies
node_trace.text = node_text
fig = go.Figure(data=[edge_trace, node_trace],
layout=go.Layout(
title='Network graph made with Python',
titlefont_size=16,
showlegend=False,
hovermode='closest',
margin=dict(b=20,l=5,r=5,t=40),
annotations=[ dict(
text="Python code: <a href='https://plotly.com/ipython-notebooks/network-graphs/'> https://plotly.com/ipython-notebooks/network-graphs/</a>",
showarrow=False,
xref="paper", yref="paper",
x=0.005, y=-0.002 ) ],
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False))
)
fig.show()
# Push Config
line_process()
#plot_graph()