One Hat Cyber Team
Your IP :
216.73.216.180
Server IP :
162.240.179.46
Server :
Linux vps-14493116.nutrivittasaude.com.br 5.14.0-611.49.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Apr 21 16:39:08 EDT 2026 x86_64
Server Software :
Apache
PHP Version :
8.2.31
Buat File
|
Buat Folder
Eksekusi
Dir :
~
/
lib
/
python3.9
/
site-packages
/
cockpit
/
View File Name :
polkit.py
# # Copyright (C) 2023 Red Hat, Inc. # SPDX-License-Identifier: GPL-3.0-or-later import asyncio import locale import logging import os import pwd from typing import Dict, List, Sequence, Tuple from cockpit._vendor.ferny import AskpassHandler from cockpit._vendor.systemd_ctypes import Variant, bus # polkit ≥ 127 (Dec 2025) uses a socket-activated helper HELPER_SOCKET = '/run/polkit/agent-helper.socket' # older versions use a setuid helper binary; # that path is valid on at least Debian, Fedora/RHEL, and Arch HELPER_PATH = '/usr/lib/polkit-1/polkit-agent-helper-1' AGENT_DBUS_PATH = '/PolkitAgent' logger = logging.getLogger(__name__) Identity = Tuple[str, Dict[str, Variant]] # https://www.freedesktop.org/software/polkit/docs/latest/eggdbus-interface-org.freedesktop.PolicyKit1.AuthenticationAgent.html # Note that we don't implement the CancelAuthentication() API. pkexec gets called in a way that has no opportunity to # cancel an ongoing authentication from the pkexec side. On the UI side cancellation is implemented via the standard # asyncio process mechanism. If we ever need CancelAuthentication(), we could keep a cookie → get_current_task() # mapping, but that method is not available for Python 3.6 yet. class org_freedesktop_PolicyKit1_AuthenticationAgent(bus.Object): def __init__(self, responder: AskpassHandler): super().__init__() self.responder = responder # confusingly named: this actually does the whole authentication dialog, see docs @bus.Interface.Method('', ['s', 's', 's', 'a{ss}', 's', 'a(sa{sv})']) async def begin_authentication(self, action_id: str, message: str, icon_name: str, details: Dict[str, str], cookie: str, identities: Sequence[Identity]) -> None: logger.debug('BeginAuthentication: action %s, message "%s", icon %s, details %s, cookie %s, identities %r', action_id, message, icon_name, details, cookie, identities) # only support authentication as ourselves, as we don't yet have the # protocol plumbing and UI to select an admin user my_uid = os.geteuid() for (auth_type, subject) in identities: if auth_type == 'unix-user' and 'uid' in subject and subject['uid'].value == my_uid: logger.debug('Authentication subject %s matches our uid %d', subject, my_uid) break else: logger.warning('Not supporting authentication as any of %s', identities) return user_name = pwd.getpwuid(my_uid).pw_name # Try socket-activated helper first (polkit >= 127), fall back to legacy setuid helper try: await self._authenticate_socket(user_name, cookie) except OSError: # Socket not available, fall back to legacy setuid helper logger.debug('Socket helper not available, falling back to legacy helper') await self._authenticate_suid_helper(user_name, cookie) async def _authenticate_suid_helper(self, user_name: str, cookie: str) -> None: """Authenticate using legacy setuid helper binary""" logger.debug('Trying legacy polkit helper at %s', HELPER_PATH) process = await asyncio.create_subprocess_exec(HELPER_PATH, user_name, cookie, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) assert process.stdin is not None assert process.stdout is not None try: await self._communicate(process.stdin, process.stdout) except asyncio.CancelledError: logger.debug('Cancelled authentication') finally: try: process.terminate() except ProcessLookupError: pass # process already exited and was reaped res = await process.wait() logger.debug('helper exited with code %i', res) async def _authenticate_socket(self, user_name: str, cookie: str) -> None: """Authenticate using socket-activated helper (polkit >= 127)""" logger.debug('Trying socket-activated polkit helper at %s', HELPER_SOCKET) reader, writer = await asyncio.open_unix_connection(HELPER_SOCKET) try: # Send username and cookie followed by newline writer.write(f'{user_name}\n{cookie}\n'.encode()) await writer.drain() await self._communicate(writer, reader) except asyncio.CancelledError: logger.debug('Cancelled authentication') finally: writer.close() await writer.wait_closed() logger.debug('socket connection closed') async def _communicate(self, stdin: asyncio.StreamWriter, stdout: asyncio.StreamReader) -> None: messages: List[str] = [] async for line in stdout: logger.debug('Read line from helper: %s', line) command, _, value = line.strip().decode().partition(' ') # usually: PAM_PROMPT_ECHO_OFF Password: \n if command.startswith('PAM_PROMPT'): # Don't pass this to the UI if it's "Password" (the usual case), # so that superuser.py uses the translated default if value.startswith('Password'): value = '' # flush out accumulated info/error messages passwd = await self.responder.do_askpass('\n'.join(messages), value, '') messages.clear() if passwd is None: logger.debug('got PAM_PROMPT %s, but do_askpass returned None', value) raise asyncio.CancelledError('no password given') logger.debug('got PAM_PROMPT %s, do_askpass returned a password', value) stdin.write(passwd.encode()) stdin.write(b'\n') del passwd # don't keep this around longer than necessary await stdin.drain() logger.debug('got PAM_PROMPT, wrote password to helper') elif command in ('PAM_TEXT_INFO', 'PAM_ERROR'): messages.append(value) elif command == 'SUCCESS': logger.debug('Authentication succeeded') break elif command == 'FAILURE': logger.warning('Authentication failed') break else: logger.warning('Unknown line from helper, aborting: %s', line) break class PolkitAgent: """Register polkit agent when required Use this as a context manager to ensure that the agent gets unregistered again. """ def __init__(self, responder: AskpassHandler): self.responder = responder self.agent_slot = None async def __aenter__(self): try: self.system_bus = bus.Bus.default_system() except OSError as e: logger.warning('cannot connect to system bus, not registering polkit agent: %s', e) return self try: # may refine that with a D-Bus call to logind self.subject = ('unix-session', {'session-id': Variant(os.environ['XDG_SESSION_ID'], 's')}) except KeyError: logger.debug('XDG_SESSION_ID not set, not registering polkit agent') return self agent_object = org_freedesktop_PolicyKit1_AuthenticationAgent(self.responder) self.agent_slot = self.system_bus.add_object(AGENT_DBUS_PATH, agent_object) # register agent locale_name = locale.setlocale(locale.LC_MESSAGES, None) await self.system_bus.call_method_async( 'org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority', 'org.freedesktop.PolicyKit1.Authority', 'RegisterAuthenticationAgent', '(sa{sv})ss', self.subject, locale_name, AGENT_DBUS_PATH) logger.debug('Registered agent for %r and locale %s', self.subject, locale_name) return self async def __aexit__(self, _exc_type, _exc_value, _traceback): if self.agent_slot: await self.system_bus.call_method_async( 'org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority', 'org.freedesktop.PolicyKit1.Authority', 'UnregisterAuthenticationAgent', '(sa{sv})s', self.subject, AGENT_DBUS_PATH) self.agent_slot.cancel() logger.debug('Unregistered agent for %r', self.subject)