Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions src/common/KM_CommonClasses.pas
Original file line number Diff line number Diff line change
Expand Up @@ -511,19 +511,24 @@ procedure TKMemoryStream.WriteBytes(const Value: TBytes);
class procedure TKMemoryStream.AsyncSaveToFileAndFree(var aStream: TKMemoryStream; const aFileName: string; aWorkerThread: TKMWorkerThread);
var
localStream: TKMemoryStream;
{$IFDEF WDC}
task: TKMWorkerThreadTask;
{$ENDIF}
begin
localStream := aStream;
aStream := nil; //So caller doesn't use it by mistake

{$IFDEF WDC}
aWorkerThread.QueueWork(procedure
task := TKMWorkerThreadTask.Create(procedure
begin
try
localStream.SaveToFile(aFileName);
finally
localStream.Free;
end;
end, 'SaveToFile');

aWorkerThread.Enqueue(task);
{$ELSE}
try
LocalStream.SaveToFile(aFileName);
Expand All @@ -538,19 +543,22 @@ class procedure TKMemoryStream.AsyncSaveToFileCompressedAndFree(var aStream: TKM
aWorkerThread: TKMWorkerThread);
var
localStream: TKMemoryStream;
task: TKMWorkerThreadTask;
begin
localStream := aStream;
aStream := nil; //So caller doesn't use it by mistake

{$IFDEF WDC}
aWorkerThread.QueueWork(procedure
task := TKMWorkerThreadTask.Create(procedure
begin
try
localStream.SaveToFileCompressed(aFileName, aMarker);
finally
localStream.Free;
end;
end, 'SaveToFileCompressed ' + aMarker);

aWorkerThread.Enqueue(task);
{$ELSE}
try
LocalStream.SaveToFileCompressed(aFileName, aMarker);
Expand All @@ -565,6 +573,9 @@ class procedure TKMemoryStream.AsyncSaveStreamsToFileAndFree(var aMainStream, aS
const aMarker1, aMarker2: string; aWorkerThread: TKMWorkerThread);
var
localSubStream1, localSubStream2, localMainStream: TKMemoryStream;
{$IFDEF WDC}
task: TKMWorkerThreadTask;
{$ENDIF}
begin
localMainStream := aMainStream;
localSubStream1 := aSubStream1;
Expand All @@ -574,7 +585,7 @@ class procedure TKMemoryStream.AsyncSaveStreamsToFileAndFree(var aMainStream, aS
aSubStream2 := nil; //So caller doesn't use it by mistake

{$IFDEF WDC}
aWorkerThread.QueueWork(procedure
task := TKMWorkerThreadTask.Create(procedure
begin
try
localMainStream.AppendStream(localSubStream1, aMarker1);
Expand All @@ -586,6 +597,8 @@ class procedure TKMemoryStream.AsyncSaveStreamsToFileAndFree(var aMainStream, aS
localMainStream.Free;
end;
end, 'SaveStreamsToFile ' + aMarker1 + ' ' + aMarker2);

aWorkerThread.Enqueue(task);
{$ELSE}
try
mainStream.AppendStream(localStream1, aMarker1);
Expand Down
108 changes: 63 additions & 45 deletions src/common/KM_WorkerThread.pas
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,49 @@ interface
uses
Classes, SysUtils, Generics.Collections;

// procedure TKMWorkLoggerCallback(aJobName: String);

type
TKMWorkerThreadTask = class
ITKMWorkerThreadTask = class abstract(TInterfacedObject)
procedure exec; virtual; abstract;
end;

TKMWorkerThreadTaskBase = class abstract(ITKMWorkerThreadTask)
protected
WorkName: string;
public
constructor Create(const aWorkName: String);
end;

TKMWorkerThreadTask = class(TKMWorkerThreadTaskBase)
private
Proc: TProc;
Callback: TProc<String>;
public
constructor Create(aProc: TProc; aCallback: TProc<String> = nil; aWorkName: string = ''); overload;
constructor Create(aProc: TProc; aCallback: aWorkName: string = ''); overload;

procedure exec; override;
end;

TKMWorkerThread = class(TThread)
private
fWorkerThreadName: string;
fWorkCompleted: Boolean;
fTaskQueue: TQueue<TKMWorkerThreadTask>;
fTaskQueue: TQueue<ITKMWorkerThreadTask>;

procedure NameThread; overload;
procedure NameThread(aThreadName: string); overload;
function GetBaseThreadName: string;
public
//Special mode for exception handling. Runs work synchronously inside QueueWork
//Special mode for exception handling. Runs work synchronously inside Enqueue
fSynchronousExceptionMode: Boolean;

constructor Create(const aThreadName: string = '');
destructor Destroy; override;
procedure Execute; override;

procedure QueueWorkAndLog(aProc: TProc; aWorkName: string = '');
procedure QueueWork(aProc: TProc; aWorkName: string = ''); overload;
procedure QueueWork(aProc: TProc; aCallback: TProc<String> = nil; aWorkName: string = ''); overload;
procedure Enqueue(aTask: ITKMWorkerThreadTask);
procedure WaitForAllWorkToComplete;
end;

Expand All @@ -55,6 +71,30 @@ implementation
KM_Log;


constructor TKMWorkerThreadTaskBase.Create(const aWorkName: String);
begin
WorkName := aWorkName;
end;

constructor TKMWorkerThreadTask.Create(aProc: TProc; aCallback: TProc<String> = nil; aWorkName: string = '');
begin
inherited Create(aWorkName);
Proc := aProc;
Callback := aCallback;
end;

constructor TKMWorkerThreadTask.Create(aProc: TProc; aWorkName: string = '');
begin
Create(aProc, nil, aWorkName);
end;

procedure TKMWorkerThreadTask.exec;
begin
Proc();
if Assigned(Callback) then
Callback(WorkName);
end;

{ TKMWorkerThread }
constructor TKMWorkerThread.Create(const aThreadName: string = '');
begin
Expand All @@ -71,7 +111,7 @@ constructor TKMWorkerThread.Create(const aThreadName: string = '');

fWorkCompleted := False;
fSynchronousExceptionMode := False;
fTaskQueue := TQueue<TKMWorkerThreadTask>.Create;
fTaskQueue := TQueue<ITKMWorkerThreadTask>.Create;
end;

destructor TKMWorkerThread.Destroy;
Expand Down Expand Up @@ -120,7 +160,7 @@ procedure TKMWorkerThread.NameThread(aThreadName: string);

procedure TKMWorkerThread.Execute;
var
job: TKMWorkerThreadTask;
job: ITKMWorkerThreadTask;
loopRunning: Boolean;
threadName: string;
begin
Expand Down Expand Up @@ -162,61 +202,39 @@ procedure TKMWorkerThread.Execute;
if job <> nil then
begin
NameThread(threadName);
job.Proc();

if Assigned(job.Callback) then
job.Callback(job.WorkName);

job.exec;
FreeAndNil(job);
end;

NameThread;
end;
end;


procedure TKMWorkerThread.QueueWorkAndLog(aProc: TProc; aWorkName: string = '');
begin
QueueWork(aProc, procedure(aJobName: String)
begin
gLog.MultithreadLogging := True;
try
gLog.AddTime(Format('Job ''%s'' is completed', [aJobName]));
finally
gLog.MultithreadLogging := False;
end;
end, aWorkName);
end;


procedure TKMWorkerThread.QueueWork(aProc: TProc; aWorkName: string = '');
begin
QueueWork(aProc, nil, aWorkName);
end;


procedure TKMWorkerThread.QueueWork(aProc: TProc; aCallback: TProc<String> = nil; aWorkName: string = '');
var
job: TKMWorkerThreadTask;
// procedure TKMWorkLoggerCallback(aJobName: String);
// begin
// gLog.MultithreadLogging := True;
// try
// gLog.AddTime(Format('Job ''%s'' is completed', [aJobName]));
// finally
// gLog.MultithreadLogging := False;
// end;
// end;

procedure TKMWorkerThread.Enqueue(aTask: ITKMWorkerThreadTask);
begin
if fSynchronousExceptionMode then
begin
aProc();
aTask.exec;
end
else
begin
if Finished then
raise Exception.Create('Worker thread not running in TKMWorkerThread.QueueWork');

job := TKMWorkerThreadTask.Create;
job.Proc := aProc;
job.Callback := aCallback;
job.WorkName := aWorkName;
raise Exception.Create('Worker thread not running in TKMWorkerThread.Enqueue');

TMonitor.Enter(fTaskQueue);
try
fWorkCompleted := False;
fTaskQueue.Enqueue(job);
fTaskQueue.Enqueue(aTask);

TMonitor.Pulse(fTaskQueue);
finally
Expand Down
14 changes: 11 additions & 3 deletions src/game/KM_Game.pas
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,7 @@ procedure TKMGame.AutoSave(aTimestamp: TDateTime);
{$IFDEF WDC}
var
localIsMultiPlayerOrSpec: Boolean;
task: TKMWorkerThreadTask;
{$ENDIF}
begin
Save(AUTOSAVE_SAVE_NAME, aTimestamp, fAutoSaveWorkerThreadHolder.Worker); //Save to temp file
Expand All @@ -1424,10 +1425,11 @@ procedure TKMGame.AutoSave(aTimestamp: TDateTime);
{$IFDEF WDC}
//Avoid accessing Self from async thread, copy required states to local variables
localIsMultiPlayerOrSpec := fParams.IsMultiPlayerOrSpec;
fAutoSaveWorkerThreadHolder.Worker.QueueWork(procedure
task := TKMWorkerThreadTask.Create(procedure
begin
DoAutoSaveRename(localIsMultiPlayerOrSpec);
end, 'AutoSaveRename');
fAutoSaveWorkerThreadHolder.Worker.Enqueue(task);
{$ELSE}
DoAutoSaveRename(fParams.IsMultiPlayerOrSpec);
{$ENDIF}
Expand Down Expand Up @@ -2228,14 +2230,15 @@ procedure TKMGame.SaveGameToStream(aTimestamp: TDateTime; aHeaderStream, aBodySt
procedure TKMGame.PrepareSaveFolder(const aPathName: String; aSaveByPlayer: Boolean; aSaveWorkerThread: TKMWorkerThread);
var
path: string;
task: TKMWorkerThreadTask;
begin
path := aPathName;
//Makes the folders in case they were deleted.
//Should do before save Minimap file for MP game
if (aPathName <> '') then
begin
// We can make directories in async too, since all save parts are made in async now
aSaveWorkerThread.QueueWork(procedure
task := TKMWorkerThreadTask.Create(procedure
begin
path := ExtractFilePath(path);
if DirectoryExists(path) then
Expand All @@ -2255,6 +2258,7 @@ procedure TKMGame.PrepareSaveFolder(const aPathName: String; aSaveByPlayer: Bool
else
ForceDirectories(path);
end, 'Prepare save dir');
aSaveWorkerThread.Enqueue(task);
end;
end;

Expand Down Expand Up @@ -2359,6 +2363,7 @@ procedure TKMGame.Save(const aSaveName: UnicodeString; aTimestamp: TDateTime; aS
I, index: Integer;
fullPath, rngPath, mpLocalDataPath, newSaveName, loadFrom: UnicodeString;
saveByPlayer: Boolean;
task: TKMWorkerThreadTask;
begin
{$IFDEF PERFLOG}
gPerfLogs.SectionEnter(psGameSaveWait);
Expand All @@ -2382,12 +2387,15 @@ procedure TKMGame.Save(const aSaveName: UnicodeString; aTimestamp: TDateTime; aS
try
// Emulate slow save in the async save thread
if SLOW_GAME_SAVE_ASYNC then
aSaveWorkerThread.QueueWork(procedure
begin
task := TKMWorkerThreadTask.Create(procedure
begin
Sleep(10000);
end,
'Slow Game Save'
);
aSaveWorkerThread.Enqueue(task);
end;

//Convert name to full path+name
fullPath := SaveName(aSaveName, EXT_SAVE_MAIN, fParams.IsMultiplayer);
Expand Down
10 changes: 8 additions & 2 deletions src/game/KM_GameSavePoints.pas
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ procedure TKMSavePointCollection.NewSavePointAsyncAndFree(var aStream: TKMemoryS
{$IFDEF WDC}
var
localStream: TKMemoryStream;
task: TKMWorkerThreadTask;
{$ENDIF}
begin
{$IFDEF WDC}
Expand All @@ -262,7 +263,7 @@ procedure TKMSavePointCollection.NewSavePointAsyncAndFree(var aStream: TKMemoryS
// Increase save threads counter in main thread
AtomicIncrement(fAsyncThreadsCnt);

aWorkerThread.QueueWork(
task := TKMWorkerThreadTask.Create(
procedure
var
S: TKMemoryStream;
Expand All @@ -285,6 +286,8 @@ procedure TKMSavePointCollection.NewSavePointAsyncAndFree(var aStream: TKMemoryS
AtomicDecrement(fAsyncThreadsCnt);
end, 'NewSavePointAsyncAndFree');

aWorkerThread.Enqueue(task);

{$ELSE}
NewSavePoint(aStream, aTick);
{$ENDIF}
Expand Down Expand Up @@ -363,6 +366,7 @@ procedure TKMSavePointCollection.SaveToFileAsync(const aFileName: UnicodeString;
{$IFNDEF WDC}
var
localStream: TKMemoryStream;
task: TKMWorkerThreadTask;
{$ENDIF}
begin
if Self = nil then Exit;
Expand All @@ -371,7 +375,7 @@ procedure TKMSavePointCollection.SaveToFileAsync(const aFileName: UnicodeString;
// Increase save threads counter in main thread
AtomicIncrement(fAsyncThreadsCnt);

aWorkerThread.QueueWork(
task := TKMWorkerThreadTask.Create(
procedure
var
localStream: TKMemoryStream;
Expand All @@ -386,6 +390,8 @@ procedure TKMSavePointCollection.SaveToFileAsync(const aFileName: UnicodeString;
localStream.Free;
end;
end, 'Save SavePoints');

aWorkerThread.Enqueue(task);
{$ELSE}
localStream := TKMemoryStreamBinary.Create;
try
Expand Down
Loading