Skip to content

Commit 88e0451

Browse files
committed
feat: implemented kafka multiplayer peer.
1 parent 2b0dcd4 commit 88e0451

File tree

13 files changed

+849
-121
lines changed

13 files changed

+849
-121
lines changed

.vscode/launch.json

+33
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,39 @@
44
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
55
"version": "0.2.0",
66
"configurations": [
7+
{
8+
"type": "lldb",
9+
"request": "launch",
10+
"preLaunchTask": "CMake: build",
11+
"name": "Debug GodotKafka",
12+
"program": "${workspaceFolder}/godot-editor/Godot_v4.3-stable_win64.exe",
13+
"args": [
14+
"--debug",
15+
"--path",
16+
"${workspaceFolder}/demo/client"
17+
],
18+
"cwd": "${workspaceFolder}",
19+
},
20+
// {
21+
// "name": "Debug GodotKafka",
22+
// "type": "godot",
23+
// "request": "launch",
24+
// "project": "${workspaceFolder}/demo/client",
25+
// "scene": "main",
26+
// "editor_path": "${workspaceFolder}/godot-editor/Godot_v4.3-stable_win64.exe",
27+
// "profiling": false,
28+
// "single_threaded_scene": false,
29+
// "debug_avoidance": false,
30+
// "debug_navigation": false,
31+
// "debug_collisions": false,
32+
// "debug_paths": false,
33+
// "debug_stringnames": false,
34+
// "frame_delay": 0,
35+
// "time_scale": 1.0,
36+
// "disable_vsync": false,
37+
// "fixed_fps": 60,
38+
// "additional_options": ""
39+
// },
740
{
841
"type": "lldb",
942
"request": "launch",

.vscode/settings.json

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"godotTools.editorPath.godot4": "d:\\Workspace\\_Personal\\godot-kafka-multiplayer-peer\\godot-editor\\Godot_v4.3-stable_win64.exe"
3+
}

.vscode/tasks.json

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"version": "2.0.0",
3+
"tasks": [
4+
// {
5+
// "label": "CMake: configure",
6+
// "type": "shell",
7+
// "command": "cmake -DCMAKE_BUILD_TYPE=Debug ../..",
8+
// "options": {
9+
// "cwd": "${workspaceFolder}/.sln/Debug"
10+
// },
11+
// },
12+
{
13+
// "dependsOn": "CMake: configure",
14+
"label": "CMake: build",
15+
"type": "shell",
16+
"command": "cmake --build .",
17+
"options": {
18+
"cwd": "${workspaceFolder}/.sln/Debug"
19+
},
20+
}
21+
]
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"requests":[{"kind":"cache","version":2},{"kind":"codemodel","version":2},{"kind":"toolchains","version":1},{"kind":"cmakeFiles","version":1}]}

demo/client/project.godot

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ config_version=5
1010

1111
[application]
1212

13-
config/name="godot cpp template"
13+
config/name="Godot Kafka Demo"
1414
run/main_scene="res://scenes/Init.tscn"
1515
config/features=PackedStringArray("4.3", "Forward Plus")
1616
config/icon="res://icon.svg"

demo/client/scenes/Init.tscn

+20
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,23 @@
66

77
[node name="NetworkManager" type="Node" parent="."]
88
script = ExtResource("1_jbx0p")
9+
10+
[node name="Control" type="Control" parent="."]
11+
layout_mode = 3
12+
anchors_preset = 15
13+
anchor_right = 1.0
14+
anchor_bottom = 1.0
15+
grow_horizontal = 2
16+
grow_vertical = 2
17+
18+
[node name="Label" type="Label" parent="Control"]
19+
layout_mode = 1
20+
anchors_preset = 5
21+
anchor_left = 0.5
22+
anchor_right = 0.5
23+
offset_left = -47.5
24+
offset_right = 47.5
25+
offset_bottom = 23.0
26+
grow_horizontal = 2
27+
text = "Hello World!"
28+
horizontal_alignment = 1
+131-17
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,141 @@
11
extends Node
22

3+
var controlled_peers: Dictionary = {}
4+
5+
var local_ping: int = 0
6+
var ping_delay_secs: int = 5
7+
var ping_timer: Timer
8+
var timeout_peer_secs: int = 5
39

410
# Called when the node enters the scene tree for the first time.
511
func _ready() -> void:
6-
init_network("localhost:9092")
7-
pass
12+
var host: String
13+
# if OS.has_feature("dedicated_server"):
14+
host = "localhost:19092" # Kafka Server
15+
# else:
16+
# host = "ws://localhost:3000/ws" # Websocket Client
17+
init_network(host)
818

919
func init_network(host: String) -> MultiplayerPeer:
10-
var socket: MultiplayerPeer;
11-
#if OS.has_feature("dedicated_server"):
20+
var socket: MultiplayerPeer
21+
# if OS.has_feature("dedicated_server"):
1222
print("Connecting to Kafka Broker: ", host)
13-
var kafka: KafkaMultiplayerPeer = KafkaMultiplayerPeer.new();
14-
kafka.register_admin_channel(host, "super")
15-
kafka.register_publisher("server-to-client", host, 1);
16-
kafka.register_subscriber(["client-to-server"], host, "group-name", 1);
17-
socket = kafka;
18-
#else:
19-
#print("Connecting to a Websocket URL: ", host)
20-
#var websocket: WebSocketMultiplayerPeer = WebSocketMultiplayerPeer.new();
21-
#websocket.create_client(host)
22-
#socket = websocket;
23-
#
23+
var kafka: KafkaMultiplayerPeer = KafkaMultiplayerPeer.new()
24+
kafka.kafka_log.connect(_on_kafka_logs)
25+
# Setup Internal Channels...
26+
# These channels do not communicate from client to server, instead this communicates between server to only edge internally.
27+
# This allows the edge server to tell the game servers that a client has connected or disconnected.
28+
var internal_producer_metadata: KafkaPublisherMetadata = KafkaPublisherMetadata.new()
29+
internal_producer_metadata.set_brokers(host)
30+
internal_producer_metadata.set_topic("internal_to_edge")
31+
kafka.register_internal_publisher(internal_producer_metadata)
32+
33+
var internal_consumer_metadata: KafkaSubscriberMetadata = KafkaSubscriberMetadata.new()
34+
internal_consumer_metadata.set_brokers(host)
35+
internal_consumer_metadata.set_topics(["internal_to_game"])
36+
internal_consumer_metadata.set_group_id("group-name")
37+
internal_consumer_metadata.set_offset_reset(KafkaSubscriberMetadata.OffsetReset.EARLIEST)
38+
kafka.register_internal_subscriber(internal_consumer_metadata)
39+
40+
# Setup Producer
41+
# This communicates directly to the clients via kafka topics.
42+
var producer_metadata: KafkaPublisherMetadata = KafkaPublisherMetadata.new()
43+
producer_metadata.set_brokers(host)
44+
producer_metadata.set_topic("server-to-client")
45+
kafka.register_publisher(producer_metadata, 1)
46+
47+
# Setup Consumer
48+
# This listens from the clients via the kafka topics.
49+
var consumer_metadata: KafkaSubscriberMetadata = KafkaSubscriberMetadata.new()
50+
consumer_metadata.set_brokers(host)
51+
consumer_metadata.set_topics(["client-to-server"])
52+
consumer_metadata.set_group_id("group-name")
53+
consumer_metadata.set_offset_reset(KafkaSubscriberMetadata.OffsetReset.EARLIEST)
54+
kafka.register_subscriber(consumer_metadata, 1)
55+
56+
kafka.peer_connected.connect(_server_on_connection)
57+
kafka.peer_disconnected.connect(_server_on_disconnection)
58+
59+
socket = kafka
60+
61+
# Initialize the ping timer for the server.
62+
ping_timer = Timer.new()
63+
ping_timer.timeout.connect(_on_ping_caller)
64+
ping_timer.wait_time = ping_delay_secs
65+
add_child(ping_timer)
66+
ping_timer.start()
67+
# else:
68+
# print("Connecting to a Websocket URL: ", host)
69+
# var websocket: WebSocketMultiplayerPeer = WebSocketMultiplayerPeer.new()
70+
# websocket.create_client(host)
71+
# websocket.peer_connected.connect(_on_connected)
72+
# socket = websocket
73+
2474
# Set the Multiplayer API to be pointed on the peer above.
2575
# At this point, the peer should be polling data.
26-
multiplayer.multiplayer_peer = socket;
27-
return socket;
76+
multiplayer.multiplayer_peer = socket
77+
78+
return socket
79+
80+
func _on_kafka_logs(log_severity: int, message: String) -> void:
81+
print("[KAFKA::%s] %s" % [str(log_severity), message])
82+
83+
func _on_ping_caller() -> void:
84+
print("[SERVER] Pinging all clients...")
85+
86+
# Check timeouts for peers.
87+
for controlled_peer in controlled_peers.keys():
88+
var last_ping_timestamp = controlled_peers[controlled_peer]
89+
var now: float = Time.get_unix_time_from_system()
90+
if now > last_ping_timestamp + ping_delay_secs + timeout_peer_secs:
91+
multiplayer.multiplayer_peer.disconnect_peer(controlled_peer, true)
92+
93+
ping_all()
94+
95+
func _server_on_connection(source: int) -> void:
96+
print("[SERVER] Client connected: ", source);
97+
98+
func _server_on_disconnection(source: int) -> void:
99+
print("[SERVER] Client disconnected: ", source)
100+
101+
func _on_connected():
102+
print("connected...")
103+
login.rpc("Hello", "World")
104+
105+
@rpc("authority")
106+
func login_callback(success: bool) -> void:
107+
if multiplayer.is_server():
108+
printerr("Server should not be processing this message...")
109+
return
110+
print("Login Successfully: ", success)
111+
112+
@rpc("any_peer")
113+
func login(username: String, password: String) -> void:
114+
if !multiplayer.is_server():
115+
printerr("Client should not be processing this message...")
116+
return
117+
118+
print("Sending Login...")
119+
login_callback.rpc_id(multiplayer.get_remote_sender_id(), true)
120+
121+
func ping_all() -> void:
122+
if !multiplayer.is_server():
123+
return
124+
125+
for controlled_peer in controlled_peers.keys():
126+
var last_timestamp = controlled_peers[controlled_peer]
127+
ping.rpc_id(controlled_peer, last_timestamp)
128+
129+
@rpc("any_peer")
130+
func ping(last_timestamp: int) -> void:
131+
if multiplayer.is_server():
132+
return
133+
pong.rpc()
134+
135+
@rpc("any_peer")
136+
func pong() -> void:
137+
if !multiplayer.is_server():
138+
return
139+
140+
var peer: int = multiplayer.get_remote_sender_id()
141+
controlled_peers[peer] = Time.get_ticks_msec()

0 commit comments

Comments
 (0)