Mercurial > hg-git-serve
view src/hgext3rd/hggit_serve/_ssh.py @ 14:959ef686193f
Add user-facing documentation.
| author | Paul Fisher <paul@pfish.zone> |
|---|---|
| date | Fri, 20 Feb 2026 21:05:31 -0500 |
| parents | 00bdfac5416c |
| children | b65d5922b8ee |
line wrap: on
line source
from __future__ import annotations import subprocess import typing as t from mercurial import error as hgerr from mercurial import registrar from . import _export as xp if t.TYPE_CHECKING: import mercurial.interfaces.repository as hgrepo import mercurial.ui as hgui cmdtable: dict[bytes, object] = {} _command = registrar.command(cmdtable) def _not_git() -> hgerr.StateError: return hgerr.StateError( b'Git is not enabled for this repository.', hint=b'The server administrator should enable the ``hggit`` extension ' b'and run ``hg git-export`` to enable Git access.', ) def _maybe(flag: bytes, include: bool) -> tuple[bytes, ...]: return (flag,) if include else () @_command( b'git-upload-pack', [ *( (b'', opt, False, b'flag passed to git upload-pack') for opt in ( b'strict', b'no-strict', b'stateless-rpc', b'advertise-refs', ) ), (b'', b'timeout', -1, b'flag passed to git upload-pack'), ], helpcategory=b'import', intents=(b'readonly',), ) def _git_upload_pack( ui: hgui.ui, repo: hgrepo.IRepo, *, strict: bool, no_strict: bool, stateless_rpc: bool, advertise_refs: bool, timeout: int, ) -> int: """Server-side handler for ``git pull``/``git clone``.""" if not xp.is_gitty(repo): raise _not_git() timeout_flag = ( (b'--timeout=' + str(timeout).encode(),) if timeout != -1 else () ) upload_pack = ui.configlist( b'hggit-serve', b'upload-pack', default=(b'git', b'upload-pack') ) return subprocess.call( ( *upload_pack, *_maybe(b'--strict', strict), *_maybe(b'--no-strict', no_strict), *_maybe(b'--stateless-rpc', stateless_rpc), *_maybe(b'--advertise_refs', advertise_refs), *timeout_flag, repo.githandler.gitdir, ), close_fds=True, ) @_command( b'git-receive-pack', [ (b'', b'skip-connectivity-check', False, b'passed to git receive-pack'), ], helpcategory=b'import', ) def _git_receive_pack( ui: hgui.ui, repo: hgrepo.IRepo, *, skip_connectivity_check: bool ) -> int: """Server-side handler for ``git push``.""" if not xp.is_gitty(repo): raise _not_git() receive_pack = ui.configlist( b'hggit-serve', b'receive-pack', default=(b'git', b'receive-pack') ) try: subprocess.check_call( ( *receive_pack, *_maybe(b'--skip-connectivity-check', skip_connectivity_check), repo.githandler.gitdir, ), close_fds=True, ) except subprocess.CalledProcessError as cpe: return cpe.returncode xp.import_all(repo, b'git-receive-pack') return 0 __all__ = ('cmdtable',)
