RustNotify ¶
RustNotify(watch_paths: list[str], debug: bool, force_polling: bool, poll_delay_ms: int, recursive: bool, ignore_permission_denied: bool)
Interface to the Rust notify crate which does the heavy lifting of watching for file changes and grouping them into events.
Create a new RustNotify
instance and start a thread to watch for changes.
FileNotFoundError
is raised if any of the paths do not exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
watch_paths |
list[str]
|
file system paths to watch for changes, can be directories or files |
required |
debug |
bool
|
if true, print details about all events to stderr |
required |
force_polling |
bool
|
if true, always use polling instead of file system notifications |
required |
poll_delay_ms |
int
|
delay between polling for changes, only used if |
required |
recursive |
bool
|
if |
required |
ignore_permission_denied |
bool
|
if |
required |
__enter__ ¶
Does nothing, but allows RustNotify
to be used as a context manager.
Note
The watching thead is created when an instance is initiated, not on __enter__
.
close ¶
Stops the watching thread. After close
is called, the RustNotify
instance can no
longer be used, calls to watch
will raise a RuntimeError
.
Note
close
is not required, just deleting the RustNotify
instance will kill the thread
implicitly.
As per #163 close()
is only required because
in the event of an error, the traceback in sys.exc_info
keeps a reference to watchfiles.watch
's
frame, so you can't rely on the RustNotify
object being deleted, and thereby stopping
the watching thread.
watch ¶
watch(debounce_ms: int, step_ms: int, timeout_ms: int, stop_event: AbstractEvent | None) -> set[tuple[int, str]] | Literal['signal', 'stop', 'timeout']
Watch for changes.
This method will wait timeout_ms
milliseconds for changes, but once a change is detected,
it will group changes and return in no more than debounce_ms
milliseconds.
The GIL is released during a step_ms
sleep on each iteration to avoid
blocking python.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
debounce_ms |
int
|
maximum time in milliseconds to group changes over before returning. |
required |
step_ms |
int
|
time to wait for new changes in milliseconds, if no changes are detected in this time, and at least one change has been detected, the changes are yielded. |
required |
timeout_ms |
int
|
maximum time in milliseconds to wait for changes before returning,
|
required |
stop_event |
AbstractEvent | None
|
event to check on every iteration to see if this function should return early.
The event should be an object which has an |
required |
Returns:
Type | Description |
---|---|
set[tuple[int, str]] | Literal['signal', 'stop', 'timeout']
|
See below. |
Return values have the following meanings:
- Change details as a
set
of(event_type, path)
tuples, the event types are ints which matchChange
,path
is a string representing the path of the file that changed 'signal'
string, if a signal was received'stop'
string, if thestop_event
was set'timeout'
string, iftimeout_ms
was exceeded
WatchfilesRustInternalError ¶
Bases: RuntimeError
Raised when RustNotify encounters an unknown error.
If you get this a lot, please check github issues and create a new issue if your problem is not discussed.
__version__
module-attribute
¶
The package version as defined in Cargo.toml
, modified to match python's versioning semantics.
Rust backend direct usage¶
The rust backend can be accessed directly as follows:
from watchfiles._rust_notify import RustNotify
r = RustNotify(['first/path', 'second/path'], False, False, 0, True, False)
changes = r.watch(1_600, 50, 100, None)
print(changes)
r.close()
Or using RustNotify
as a context manager:
from watchfiles._rust_notify import RustNotify
with RustNotify(['first/path', 'second/path'], False, False, 0, True, False) as r:
changes = r.watch(1_600, 50, 100, None)
print(changes)
(See the documentation on close
above for when and why the
context manager or close
method are required.)