Mario Vormstein
01/09/2023, 4:00 PMfrom prefect import flow, get_run_logger, task, get_run_logger
@flow(name='flow_from_jupyter')
def do_something():
logger = get_run_logger()
<http://logger.info|logger.info>('Executing from Jupyer')
<http://logger.info|logger.info>('More logs')
and I re-evaluate the cell, I get an UserWarning
/opt/conda/lib/python3.10/site-packages/prefect/flows.py:206: UserWarning: A flow named 'flow_from_jupyter' and defined at '/tmp/ipykernel_166/4031630764.py:2' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
Everything still works but the Warning is annoying to users. Is there a real problem and could this warning be suppressed somehow?Nimesh Kumar
01/09/2023, 5:09 PMLaraib Siddiqui
01/10/2023, 7:41 AMfrom prefect import flow, task
from prefect_shell import shell_run_command
from prefect_dask.task_runners import DaskTaskRunner
@task
def tele_caller():
return shell_run_command(
command="python3 /data-analytics/telecaller_leads.py -r 0 ",
return_all=True,
)
@task
def teleconsultation():
return shell_run_command(
command="python3 /data-analytics/teleconsultation.py -r 0 ",
return_all=True,
)
@task
def user_profiles():
return shell_run_command(
command=
"python3 /data-analytics/user_profiles_created_last_24_hours.py -r 0 ",
return_all=True,
)
@task
def error_pages():
return shell_run_command(
command="python3 /data-analytics/error_pages_data.py -r 0 ",
return_all=True,
)
@task
def non_del():
return shell_run_command(
command="python3 /data-analytics/non_delivered_orders.py -r 0 ",
return_all=True,
)
@task
def non_ship():
return shell_run_command(
command="python3 /data-analytics/non_shipped_orders.py -r 0 ",
return_all=True)
@task
def gmd_review():
return shell_run_command(
command="python3 /data-analytics/good_md_content_data.py -r 0 ",
return_all=True,
)
@task
def gmd_content():
return shell_run_command(
command="python3 /data-analytics/good_md_review_data.py -r 0 ",
return_all=True)
@flow
def final_docker_run(task_runner=DaskTaskRunner()):
user_profiles.submit()
tele_caller.submit()
teleconsultation.submit()
non_del.submit()
non_ship.submit()
error_pages.submit()
gmd_content.submit()
gmd_review.submit()
if __name__ == '__main__':
final_docker_run()
# print(final_shell_run())
# print(example_shell_run_command_flow())
Laraib Siddiqui
01/10/2023, 7:42 AMfrom prefect import flow, task
from prefect_shell import shell_run_command
from prefect_dask.task_runners import DaskTaskRunner
def tele_caller():
return shell_run_command(
command="python3 /data-analytics/telecaller_leads.py -r 0 ",
return_all=True,
)
def teleconsultation():
return shell_run_command(
command="python3 /data-analytics/teleconsultation.py -r 0 ",
return_all=True,
)
def user_profiles():
return shell_run_command(
command=
"python3 /data-analytics/user_profiles_created_last_24_hours.py -r 0 ",
return_all=True,
)
def error_pages():
return shell_run_command(
command="python3 /data-analytics/error_pages_data.py -r 0 ",
return_all=True,
)
def non_del():
return shell_run_command(
command="python3 /data-analytics/non_delivered_orders.py -r 0 ",
return_all=True,
)
def non_ship():
return shell_run_command(
command="python3 /data-analytics/non_shipped_orders.py -r 0 ",
return_all=True)
def gmd_review():
return shell_run_command(
command="python3 /data-analytics/good_md_content_data.py -r 0 ",
return_all=True,
)
def gmd_content():
return shell_run_command(
command="python3 /data-analytics/good_md_review_data.py -r 0 ",
return_all=True)
@flow
def final_docker_run(task_runner=DaskTaskRunner()):
user_profiles.submit()
tele_caller.submit()
teleconsultation.submit()
non_del.submit()
non_ship.submit()
error_pages.submit()
gmd_content.submit()
gmd_review.submit()
if __name__ == '__main__':
final_docker_run()
# print(final_shell_run())
# print(example_shell_run_command_flow())
Idan
01/10/2023, 3:21 PMos.seteuid(priv_user_id)
(and then later resets to original_user_id
).
This is run in a container, so the original_user_id
is root
, which entails PREFECT_HOME
is /root/.prefect/
.
Our tasks are long-running, so we also cache them. Then, every now and then, a task succeeds, but fails with:
Crash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):
File "/usr/lib/python3.10/pathlib.py", line 1175, in mkdir
self._accessor.mkdir(self, mode)
PermissionError: [Errno 13] Permission denied: '/root/.prefect/storage'
During handling of the above exception, another exception occurred:
PermissionError: [Errno 13] Permission denied: '/root/.prefect/storage'
Any smart ideas how to facilitate both needs?Kalise Richmond
01/10/2023, 5:59 PMRicky Zhang
01/10/2023, 8:44 PMNils
01/11/2023, 1:24 PMrun_deployment
? I have a setup similar to the following, but when I pass the pydantic model I get some weird behavior.
class Config(BaseModel):
...
@flow(name="subflow")
def subflow(config: Config = Config()):
do_something()
@flow
def main_flow(config: Config = Config()):
run_deployment(name="subflow", parameters={"config": config})
Kate Weber
01/11/2023, 2:37 PMmain()
function with a ConcurrentTaskRunner
but everything just went on hold until the Flask job was killed.
Is there a better way to do this? Should I dockerise the Flask server and use the docker
module in Python to start and stop it? Is using a DockerContainer to start it going to block other tasks? Other suggestions?Nimesh Kumar
01/13/2023, 12:05 PMJohn Lewis
01/13/2023, 7:37 PMDaskTaskRunner
configured to access an existing Dask cluster at <http://my-dask-cluster>
.
@flow(task_runner=DaskTaskRunner(address="http://my-dask-cluster"))
By the way, when I attempted using "http", I got this message:
ValueError: unknown address scheme 'http'
I chose tcp and 8786 based on this line I see in my dask scheduler's log:
distributed.scheduler - INFO - Scheduler at: tcp://10.254.229.85:8786
I exposed the port:
k expose pod mycluster-scheduler --port 8786 --name mycluster
Unfortunately, when I try to run the python script, I see:
OSError: Timed out trying to connect to tcp://hostname:8786 after 30 s
Am I doing something obviously wrong? Any tips for troubleshooting?Rohit Motiani
01/15/2023, 6:11 PMMario Vormstein
01/16/2023, 2:22 PM@task
decorator and I can see in the UI that a flow can have tags as well. But the @flow
decorator does not have such an option?John Lewis
01/16/2023, 10:17 PMBryce Morrow
01/17/2023, 6:29 PMAaron Pritchard
01/18/2023, 3:57 AMEmil Ostergaard
01/18/2023, 10:27 AMimport datetime as dt
import asyncio
from prefect import task, flow
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash, cache_expiration=dt.timedelta(minutes=1))
async def get_token(key):
return key
async def build_subflow(n):
@flow(retries=1, retry_delay_seconds=61)
async def subflow(id):
token = await get_token(dt.datetime.now().strftime('%Y-%m-%d %H:%M'))
print(f'Id: {id}-{token}')
raise Exception('ERROR')
await subflow(n)
@flow(log_prints=False)
async def main_flow():
await asyncio.create_task(build_subflow(1))
if __name__ == '__main__':
main_flow_state = asyncio.run(main_flow())
Current behavior:
10:50:05.123 | INFO | prefect.engine - Created flow run 'magnificent-partridge' for flow 'main-flow'
10:50:07.225 | INFO | Flow run 'magnificent-partridge' - Created subflow run 'keen-galago' for flow 'subflow'
10:50:07.868 | INFO | Flow run 'keen-galago' - Created task run 'get_token-5531366f-0' for task 'get_token'
10:50:07.869 | INFO | Flow run 'keen-galago' - Executing 'get_token-5531366f-0' immediately...
10:50:08.738 | INFO | Task run 'get_token-5531366f-0' - Finished in state Completed()
10:50:08.740 | ERROR | Flow run 'keen-galago' - Encountered exception during execution:
...
Id: 1-2023-01-18 10:50
10:50:08.995 | INFO | Flow run 'keen-galago' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
10:51:10.832 | INFO | Flow run 'keen-galago' - Created task run 'get_token-5531366f-0' for task 'get_token'
10:51:10.833 | INFO | Flow run 'keen-galago' - Executing 'get_token-5531366f-0' immediately...
10:51:11.440 | INFO | Task run 'get_token-5531366f-0' - Task run '53b5da1b-e807-4db2-8404-633b88c94c8d' already finished.
Id: 1-2023-01-18 10:50
10:51:11.840 | ERROR | Flow run 'keen-galago' - Encountered exception during execution:
...
So basically I see this behavior because Task run 'get_token-5531366f-0' - Task run '53b5da1b-e807-4db2-8404-633b88c94c8d' already finished
- any way to force a re-run?
Thanks in advance!Ian
01/19/2023, 7:55 PMNimesh Kumar
01/20/2023, 12:27 PMprocessor : 3
vendor_id : GenuineIntel
cpu family : 6
model : 158
model name : Intel(R) Core(TM) i5-7500 CPU @ 3.40GHz
stepping : 9
microcode : 0xea
cpu MHz : 800.086
cache size : 6144 KB
physical id : 0
siblings : 4
core id : 3
cpu cores : 4
apicid : 6
initial apicid : 6
cache_alignment : 64
address sizes : 39 bits physical, 48 bits virtual
My question is
1. Does Number of concurrent flow depends on Hardware configuration like threads etc ?
a. if not! then on what basis i can select the number of concurrent flow for smooth processing
b. will there be any case where i will get (out of memory error) ?
c. Also how can i select the number of parallel flow?
THANKSGaetan
01/20/2023, 4:53 PMJohn Lewis
01/23/2023, 10:38 PMLaraib Siddiqui
01/24/2023, 2:57 AMEthienne Marcelin
01/24/2023, 3:56 PMOfir
01/24/2023, 5:42 PMKalise Richmond
01/24/2023, 5:50 PMNimesh Kumar
01/25/2023, 3:53 AMGautier
01/25/2023, 3:23 PMtmp-graphql-1 | Running Alembic migrations...
tmp-graphql-1 |
tmp-graphql-1 | Could not upgrade the database!
tmp-graphql-1 | Error: Can't load plugin: sqlalchemy.dialects:postgres
Does anyone have any insight about this?Big Ben
01/25/2023, 4:47 PMEthienne Marcelin
01/25/2023, 4:48 PMArturo Martínez Pacheco
01/27/2023, 12:04 AM