-
Notifications
You must be signed in to change notification settings - Fork 236
Expand file tree
/
Copy pathutil.py
More file actions
217 lines (179 loc) · 6.49 KB
/
util.py
File metadata and controls
217 lines (179 loc) · 6.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
"""Utility and helper methods for tmuxp."""
from __future__ import annotations
import logging
import os
import shlex
import subprocess
import sys
import typing as t
from . import exc
from .log import tmuxp_echo
if t.TYPE_CHECKING:
import pathlib
from libtmux.pane import Pane
from libtmux.server import Server
from libtmux.session import Session
from libtmux.window import Window
logger = logging.getLogger(__name__)
PY2 = sys.version_info[0] == 2
def run_before_script(
script_file: str | pathlib.Path,
cwd: pathlib.Path | None = None,
on_line: t.Callable[[str], None] | None = None,
) -> int:
"""Execute shell script, streaming output to callback or terminal (if TTY).
Output is buffered and optionally forwarded via the ``on_line`` callback.
"""
script_cmd = shlex.split(str(script_file))
try:
proc = subprocess.Popen(
script_cmd,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True, # decode to str
errors="backslashreplace",
)
except FileNotFoundError as e:
raise exc.BeforeLoadScriptNotExists(
e,
os.path.abspath(script_file), # NOQA: PTH100
) from e
out_buffer = []
err_buffer = []
# While process is running, read lines from stdout/stderr
# and write them to this process's stdout/stderr if isatty
is_out_tty = sys.stdout.isatty()
is_err_tty = sys.stderr.isatty()
# You can do a simple loop reading in real-time:
while True:
# Use .poll() to check if the child has exited
return_code = proc.poll()
# Read one line from stdout, if available
line_out = proc.stdout.readline() if proc.stdout else ""
# Read one line from stderr, if available
line_err = proc.stderr.readline() if proc.stderr else ""
if line_out and line_out.strip():
out_buffer.append(line_out)
if on_line is not None:
on_line(line_out)
elif is_out_tty:
sys.stdout.write(line_out)
sys.stdout.flush()
if line_err and line_err.strip():
err_buffer.append(line_err)
if on_line is not None:
on_line(line_err)
elif is_err_tty:
sys.stderr.write(line_err)
sys.stderr.flush()
# If no more data from pipes and process ended, break
if not line_out and not line_err and return_code is not None:
break
# At this point, the process has finished
return_code = proc.wait()
if return_code != 0:
# Join captured stderr lines for your exception
stderr_str = "".join(err_buffer).strip()
raise exc.BeforeLoadScriptError(
return_code,
os.path.abspath(script_file), # NOQA: PTH100
stderr_str,
)
return return_code
def oh_my_zsh_auto_title() -> None:
"""Give warning and offer to fix ``DISABLE_AUTO_TITLE``.
See: https://github.com/robbyrussell/oh-my-zsh/pull/257
"""
if (
"SHELL" in os.environ
and "zsh" in os.environ.get("SHELL", "")
and os.path.exists(os.path.expanduser("~/.oh-my-zsh")) # NOQA: PTH110, PTH111
and (
"DISABLE_AUTO_TITLE" not in os.environ
or os.environ.get("DISABLE_AUTO_TITLE") == "false"
)
):
logger.warning("oh-my-zsh DISABLE_AUTO_TITLE not set")
tmuxp_echo(
"oh-my-zsh DISABLE_AUTO_TITLE not set.\n\n"
"Please set:\n\n"
"\texport DISABLE_AUTO_TITLE='true'\n\n"
"in ~/.zshrc or where your zsh profile is stored.\n"
'Remember the "export" at the beginning!\n\n'
"Then create a new shell or type:\n\n"
"\t$ source ~/.zshrc",
)
def get_current_pane(server: Server) -> Pane | None:
"""Return Pane if one found in env."""
if os.getenv("TMUX_PANE") is not None:
try:
return next(p for p in server.panes if p.pane_id == os.getenv("TMUX_PANE"))
except StopIteration:
pass
return None
def get_session(
server: Server,
session_name: str | None = None,
current_pane: Pane | None = None,
) -> Session:
"""Get tmux session for server by session name, respects current pane, if passed."""
try:
if session_name:
session = server.sessions.get(session_name=session_name)
elif current_pane is not None:
session = server.sessions.get(session_id=current_pane.session_id)
else:
current_pane = get_current_pane(server)
if current_pane:
session = server.sessions.get(session_id=current_pane.session_id)
else:
session = server.sessions[0]
except Exception as e:
if session_name:
raise exc.SessionNotFound(session_name) from e
raise exc.SessionNotFound from e
assert session is not None
return session
def get_window(
session: Session,
window_name: str | None = None,
current_pane: Pane | None = None,
) -> Window:
"""Get tmux window for server by window name, respects current pane, if passed."""
try:
if window_name:
window = session.windows.get(window_name=window_name)
elif current_pane is not None:
window = session.windows.get(window_id=current_pane.window_id)
else:
window = session.windows[0]
except Exception as e:
if window_name:
raise exc.WindowNotFound(window_target=window_name) from e
if current_pane:
raise exc.WindowNotFound(window_target=str(current_pane)) from e
raise exc.WindowNotFound from e
assert window is not None
return window
def get_pane(window: Window, current_pane: Pane | None = None) -> Pane:
"""Get tmux pane for server by pane name, respects current pane, if passed."""
pane = None
try:
if current_pane is not None:
pane = window.panes.get(pane_id=current_pane.pane_id)
else:
pane = window.active_pane
except Exception as e:
logger.debug(
"pane lookup failed",
extra={"tmux_pane": str(current_pane) if current_pane else ""},
)
if current_pane:
raise exc.PaneNotFound(str(current_pane)) from e
raise exc.PaneNotFound from e
if pane is None:
if current_pane:
raise exc.PaneNotFound(str(current_pane))
raise exc.PaneNotFound
return pane