Skip to content

Commit 918228a

Browse files
author
Robin VAN DE MERGHEL
committed
feat: Add pilot legacy logging
1 parent 746d6b4 commit 918228a

File tree

3 files changed

+109
-88
lines changed

3 files changed

+109
-88
lines changed

Pilot/dirac-pilot.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,25 +63,30 @@
6363
# print the buffer, so we have a "classic' logger back in sync.
6464
sys.stdout.write(bufContent)
6565
# now the remote logger.
66-
remote = pilotParams.pilotLogging and (pilotParams.loggerURL is not None)
67-
if remote:
66+
if pilotParams.pilotLogging:
6867
# In a remote logger enabled Dirac version we would have some classic logger content from a wrapper,
6968
# which we passed in:
7069
receivedContent = ""
7170
if not sys.stdin.isatty():
7271
receivedContent = sys.stdin.read()
7372
log = RemoteLogger(
7473
pilotParams.loggerURL,
75-
"Pilot",
74+
useServerCertificate=pilotParams.useServerCertificate,
75+
name="Pilot",
7676
bufsize=pilotParams.loggerBufsize,
7777
pilotUUID=pilotParams.pilotUUID,
7878
debugFlag=pilotParams.debugFlag,
79-
wnVO=pilotParams.wnVO,
8079
)
8180
log.info("Remote logger activated")
82-
log.buffer.write(receivedContent)
81+
log.buffer.write(log.format_to_json(
82+
"INFO",
83+
receivedContent,
84+
))
8385
log.buffer.flush()
84-
log.buffer.write(bufContent)
86+
log.buffer.write(log.format_to_json(
87+
"INFO",
88+
bufContent,
89+
))
8590
else:
8691
log = Logger("Pilot", debugFlag=pilotParams.debugFlag)
8792

@@ -103,7 +108,7 @@
103108
log.info("Requested command extensions: %s" % str(pilotParams.commandExtensions))
104109

105110
log.info("Executing commands: %s" % str(pilotParams.commands))
106-
111+
remote = pilotParams.pilotLogging
107112
if remote:
108113
# It's safer to cancel the timer here. Each command has got its own logger object with a timer cancelled by the
109114
# finaliser. No need for a timer in the "else" code segment below.
@@ -122,5 +127,8 @@
122127
log.error("Command %s could not be instantiated" % commandName)
123128
# send the last message and abandon ship.
124129
if remote:
125-
log.buffer.flush()
130+
log.buffer.flush(force=True)
126131
sys.exit(-1)
132+
133+
log.info("Pilot tasks finished.")
134+
log.buffer.flush(force=True)

Pilot/pilotCommands.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,20 +87,29 @@ def wrapper(self):
8787

8888
except SystemExit as exCode: # or Exception ?
8989
# controlled exit
90+
if self.pp.pilotLogging:
91+
try:
92+
self.log.error(str(exCode))
93+
self.log.error(traceback.format_exc())
94+
self.log.buffer.flush(force=True)
95+
except Exception as exc:
96+
self.log.error("Remote logger couldn't be finalised %s " % str(exc))
97+
raise
98+
99+
# If we don't have a remote logger
90100
pRef = self.pp.pilotReference
91101
self.log.info(
92-
"Flushing the remote logger buffer for pilot on sys.exit(): %s (exit code:%s)" % (pRef, str(exCode))
102+
"Flushing the logger buffer for pilot on sys.exit(): %s (exit code:%s)" % (pRef, str(exCode))
93103
)
94104
self.log.buffer.flush() # flush the buffer unconditionally (on sys.exit()).
95-
try:
96-
sendMessage(self.log.url, self.log.pilotUUID, "finaliseLogs", {"retCode": str(exCode)})
97-
except Exception as exc:
98-
self.log.error("Remote logger couldn't be finalised %s " % str(exc))
99-
raise
105+
100106
except Exception as exc:
101107
# unexpected exit: document it and bail out.
102-
self.log.error(str(exc))
103-
self.log.error(traceback.format_exc())
108+
if self.pp.pilotLogging:
109+
# Force flush if it's a remote logger
110+
self.log.buffer.flush(force=True)
111+
else:
112+
self.log.buffer.flush()
104113
raise
105114
finally:
106115
self.log.buffer.cancelTimer()

Pilot/pilotTools.py

Lines changed: 76 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -518,15 +518,15 @@ class RemoteLogger(Logger):
518518

519519
def __init__(
520520
self,
521-
url,
521+
url, # Not used yet
522+
useServerCertificate,
522523
name="Pilot",
523524
debugFlag=False,
524525
pilotOutput="pilot.out",
525526
isPilotLoggerOn=True,
526527
pilotUUID="unknown",
527528
flushInterval=10,
528529
bufsize=1000,
529-
wnVO="unknown",
530530
):
531531
"""
532532
c'tor
@@ -537,34 +537,48 @@ def __init__(
537537
self.url = url
538538
self.pilotUUID = pilotUUID
539539
self.isPilotLoggerOn = isPilotLoggerOn
540-
sendToURL = partial(sendMessage, url, pilotUUID, wnVO, "sendMessage")
540+
sendToURL = partial(sendMessage, useServerCertificate, pilotUUID)
541541
self.buffer = FixedSizeBuffer(sendToURL, bufsize=bufsize, autoflush=flushInterval)
542542

543-
def debug(self, msg, header=True, _sendPilotLog=False):
544-
# TODO: Send pilot log remotely?
543+
def format_to_json(self, level, message):
544+
545+
escaped = json.dumps(message)[1:-1] # remove outer quotes
546+
547+
# Split on escaped newlines
548+
splitted_message = escaped.split("\\n")
549+
550+
output = []
551+
for mess in splitted_message:
552+
if mess:
553+
output.append({
554+
"timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
555+
"severity": level,
556+
"message": mess,
557+
"scope": self.name
558+
})
559+
return output
560+
561+
def debug(self, msg, header=True):
545562
super(RemoteLogger, self).debug(msg, header)
546563
if (
547564
self.isPilotLoggerOn and self.debugFlag
548565
): # the -d flag activates this debug flag in CommandBase via PilotParams
549-
self.sendMessage(self.messageTemplate.format(level="DEBUG", message=msg))
566+
self.sendMessage(self.format_to_json(level="DEBUG", message=msg))
550567

551-
def error(self, msg, header=True, _sendPilotLog=False):
552-
# TODO: Send pilot log remotely?
568+
def error(self, msg, header=True):
553569
super(RemoteLogger, self).error(msg, header)
554570
if self.isPilotLoggerOn:
555-
self.sendMessage(self.messageTemplate.format(level="ERROR", message=msg))
571+
self.sendMessage(self.format_to_json(level="ERROR", message=msg))
556572

557-
def warn(self, msg, header=True, _sendPilotLog=False):
558-
# TODO: Send pilot log remotely?
573+
def warn(self, msg, header=True):
559574
super(RemoteLogger, self).warn(msg, header)
560575
if self.isPilotLoggerOn:
561-
self.sendMessage(self.messageTemplate.format(level="WARNING", message=msg))
576+
self.sendMessage(self.format_to_json(level="WARNING", message=msg))
562577

563-
def info(self, msg, header=True, _sendPilotLog=False):
564-
# TODO: Send pilot log remotely?
578+
def info(self, msg, header=True):
565579
super(RemoteLogger, self).info(msg, header)
566580
if self.isPilotLoggerOn:
567-
self.sendMessage(self.messageTemplate.format(level="INFO", message=msg))
581+
self.sendMessage(self.format_to_json(level="INFO", message=msg))
568582

569583
def sendMessage(self, msg):
570584
"""
@@ -576,7 +590,7 @@ def sendMessage(self, msg):
576590
:rtype: None
577591
"""
578592
try:
579-
self.buffer.write(msg + "\n")
593+
self.buffer.write(msg)
580594
except Exception as err:
581595
super(RemoteLogger, self).error("Message not sent")
582596
super(RemoteLogger, self).error(str(err))
@@ -621,34 +635,31 @@ def __init__(self, senderFunc, bufsize=1000, autoflush=10):
621635
self._timer.start()
622636
else:
623637
self._timer = None
624-
self.output = StringIO()
638+
self.output = []
625639
self.bufsize = bufsize
626640
self._nlines = 0
627641
self.senderFunc = senderFunc
628642

629643
@synchronized
630-
def write(self, text):
644+
def write(self, content_json):
631645
"""
632646
Write text to a string buffer. Newline characters are counted and number of lines in the buffer
633647
is increased accordingly.
634648
635-
:param text: text string to write
636-
:type text: str
649+
:param content_json: Json to send, format following format_to_json
650+
:type content_json: list[dict]
637651
:return: None
638652
:rtype: None
639653
"""
640-
# reopen the buffer in a case we had to flush a partially filled buffer
641-
if self.output.closed:
642-
self.output = StringIO()
643-
self.output.write(text)
644-
self._nlines += max(1, text.count("\n"))
654+
655+
self.output.extend(content_json)
656+
657+
try:
658+
self._nlines += max(1, len(content_json))
659+
except Exception:
660+
raise ValueError(content_json)
645661
self.sendFullBuffer()
646662

647-
@synchronized
648-
def getValue(self):
649-
content = self.output.getvalue()
650-
return content
651-
652663
@synchronized
653664
def sendFullBuffer(self):
654665
"""
@@ -658,22 +669,19 @@ def sendFullBuffer(self):
658669

659670
if self._nlines >= self.bufsize:
660671
self.flush()
661-
self.output = StringIO()
672+
self.output = []
662673

663674
@synchronized
664-
def flush(self):
675+
def flush(self, force=False):
665676
"""
666677
Flush the buffer and send log records to a remote server. The buffer is closed as well.
667678
668679
:return: None
669680
:rtype: None
670681
"""
671-
if not self.output.closed and self._nlines > 0:
672-
self.output.flush()
673-
buf = self.getValue()
674-
self.senderFunc(buf)
682+
if force or (self.output and self._nlines > 0):
683+
self.senderFunc(self.output)
675684
self._nlines = 0
676-
self.output.close()
677685

678686
def cancelTimer(self):
679687
"""
@@ -686,40 +694,32 @@ def cancelTimer(self):
686694
self._timer.cancel()
687695

688696

689-
def sendMessage(url, pilotUUID, wnVO, method, rawMessage):
690-
"""
691-
Invoke a remote method on a Tornado server and pass a JSON message to it.
692-
693-
:param str url: Server URL
694-
:param str pilotUUID: pilot unique ID
695-
:param str wnVO: VO name, relevant only if not contained in a proxy
696-
:param str method: a method to be invoked
697-
:param str rawMessage: a message to be sent, in JSON format
698-
:return: None.
699-
"""
700-
caPath = os.getenv("X509_CERT_DIR")
701-
cert = os.getenv("X509_USER_PROXY")
702-
703-
context = ssl.create_default_context()
704-
context.load_verify_locations(capath=caPath)
697+
def sendMessage(useServerCertificate, pilotUUID, rawMessage = []):
698+
cfg = []
699+
if useServerCertificate:
700+
cfg.append("-o /DIRAC/Security/UseServerCertificate=yes")
705701

706-
message = json.dumps((json.dumps(rawMessage), pilotUUID, wnVO))
702+
formatted_logs = json.dumps(rawMessage)
703+
704+
# Escape single quotes in JSON string for safe shell quoting
705+
safe_logs = formatted_logs.replace("'", "'\\''")
707706

708-
try:
709-
context.load_cert_chain(cert) # this is a proxy
710-
raw_data = {"method": method, "args": message}
711-
except IsADirectoryError: # assuming it'a dir containing cert and key
712-
context.load_cert_chain(os.path.join(cert, "hostcert.pem"), os.path.join(cert, "hostkey.pem"))
713-
raw_data = {"method": method, "args": message, "extraCredentials": '"hosts"'}
714-
715-
if sys.version_info[0] == 3:
716-
data = urlencode(raw_data).encode("utf-8") # encode to bytes ! for python3
717-
else:
718-
# Python2
719-
data = urlencode(raw_data)
707+
cmd = "dirac-admin-send-pilot-logs %s '%s' %s -d" % (
708+
pilotUUID,
709+
safe_logs,
710+
" ".join(cfg),
711+
)
720712

721-
res = urlopen(url, data, context=context)
722-
res.close()
713+
FNULL = open(os.devnull, 'w')
714+
_p = subprocess.Popen(
715+
cmd,
716+
shell=True,
717+
stdout=FNULL,
718+
stderr=FNULL,
719+
close_fds=False
720+
)
721+
_p.wait()
722+
FNULL.close()
723723

724724

725725
class CommandBase(object):
@@ -749,12 +749,12 @@ def __init__(self, pilotParams):
749749
# remote logger
750750
self.log = RemoteLogger(
751751
loggerURL,
752-
self.__class__.__name__,
752+
useServerCertificate=pilotParams.useServerCertificate,
753+
name=self.__class__.__name__,
753754
pilotUUID=pilotParams.pilotUUID,
754755
debugFlag=self.debugFlag,
755756
flushInterval=interval,
756757
bufsize=bufsize,
757-
wnVO=pilotParams.wnVO,
758758
)
759759

760760
self.log.isPilotLoggerOn = isPilotLoggerOn
@@ -794,8 +794,12 @@ def executeAndGetOutput(self, cmd, environDict=None):
794794
else:
795795
sys.stdout.write(outChunk)
796796
sys.stdout.flush()
797-
if hasattr(self.log, "buffer") and self.log.isPilotLoggerOn:
798-
self.log.buffer.write(outChunk)
797+
if hasattr(self.log, "url"):
798+
# It's a remote logger
799+
self.log.buffer.write(self.log.format_to_json( # type: ignore
800+
"COMMAND",
801+
outChunk
802+
))
799803
outData += outChunk
800804
# If no data was read on any of the pipes then the process has finished
801805
if not dataWasRead:

0 commit comments

Comments
 (0)