client = huey.create_consumer(
staff=4,
worker_type=WORKER_THREAD,
periodic=True,
initial_delay=0.1,
backoff=1.15,
max_delay=2.0,
scheduler_interval=1,
check_worker_health=True,
health_check_interval=10,
flush_locks=False,
)
consumer_thread = threading.Thread(goal=client.run, daemon=True)
consumer_thread.begin()
print("Client began (threaded).")
print("nEnqueue fundamentals...")
r1 = quick_add(10, 32)
r2 = slow_io(0.75)
print("quick_add consequence:", r1(blocking=True, timeout=5))
print("slow_io consequence:", r2(blocking=True, timeout=5))
print("nRetries + precedence demo (flaky job)...")
rf = flaky_network_call(p_fail=0.7)
attempt:
print("flaky_network_call consequence:", rf(blocking=True, timeout=10))
besides Exception as e:
print("flaky_network_call failed even after retries:", repr(e))
print("nContext job (job id inside payload)...")
rp = cpu_pi_estimate(samples=150_000)
print("pi payload:", rp(blocking=True, timeout=20))
print("nLocks demo: enqueue a number of locked jobs shortly (ought to serialize)...")
locked_results = [locked_sync_job(tag=f"run{i}") for i in range(3)]
print([res(blocking=True, timeout=10) for res in locked_results])
print("nScheduling demo: run slow_io in ~3 seconds...")
rs = slow_io.schedule(args=(0.25,), delay=3)
print("scheduled deal with:", rs)
print("scheduled slow_io consequence:", rs(blocking=True, timeout=10))
print("nRevoke demo: schedule a job in 5s then revoke earlier than it runs...")
rv = slow_io.schedule(args=(0.1,), delay=5)
rv.revoke()
time.sleep(6)
attempt:
out = rv(blocking=False)
print("revoked job output:", out)
besides Exception as e:
print("revoked job didn't produce consequence (anticipated):", sort(e).__name__, str(e)[:120])
print("nPipeline demo...")
pipeline = (
fetch_number.s(123)
.then(transform_number, 5)
.then(store_result)
)
pipe_res = huey.enqueue(pipeline)
print("pipeline ultimate consequence:", pipe_res(blocking=True, timeout=10))
print("nStarting 15-second heartbeat demo for ~40 seconds...")
start_seconds_heartbeat(interval_sec=15)
time.sleep(40)
stop_seconds_heartbeat()
print("Stopped 15-second heartbeat demo.")
print_latest_events(12)
print("nStopping client gracefully...")
client.cease(sleek=True)
consumer_thread.be a part of(timeout=5)
print("Client stopped.")
