Donny Flynn
10/27/2021, 4:53 PMImran Qureshi
10/27/2021, 5:22 PMVamsi Reddy
10/27/2021, 6:14 PMKevin Mullins
10/27/2021, 6:43 PMJohn Muehlhausen
10/27/2021, 7:27 PM@task(log_stdout=True)
# ...
with Flow(..., executor=LocalDaskExecutor(scheduler="processes", num_workers=2)) as flow:
I see stdout when using the threads scheduler, but not with processes ... prefect version is 0.14.19 ... running flow locally in Jupyter at this point, no agent ... I should also mention that using the threads scheduler causes subsequent runs of the notebook cell to do nothing, but it is no surprise to me that threads would be less stable than subprocesses whose end state is cleaned up by the OSChristopher Chong Tau Teng
10/28/2021, 3:17 AMprefecthq/prefect:latest
image, which also copied my python script /src/test.py
and ran prefect backend server
. Then in the docker-compose.yaml
generated by prefect server config
, I added a service client
whose command is python /src/test/py
.
After running docker-compose up
, I got the following error from the client
service:
requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f24823ea0d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
Meanwhile, I can access UI and on UI it’s connected to Apollo endpoint successfully. It seems like client
fails to connect to server at localhost:4200?Pierre Monico
10/28/2021, 9:32 AMWill List
10/28/2021, 10:57 AMAhmed Ezzat
10/28/2021, 3:03 PMwith prefect.Flow('myflow') as flow:
task_1 = my_task_1()
task_2 = my_task_2(task_1['value'])
bral
10/28/2021, 3:43 PMConstantino Schillebeeckx
10/28/2021, 7:13 PMException
that caused the failure from this callback?Kelvin Malyar
10/28/2021, 8:20 PMSteve s
10/28/2021, 8:51 PMflow_id
while running?Harish
10/28/2021, 9:09 PMJoe
10/28/2021, 9:22 PMstart_flow_run.map(...)
to use the same flow, but iterate across some provided parameters. Unfortunately when I set the flow_name=unmapped('webfs')
I get this error when attempting to run the flow: `ValueError: Received both flow_id
and flow_name
. Only one flow identifier can be passed.`Harish
10/28/2021, 10:20 PMFile "/home/prefect_user/venv/lib/python3.6/site-packages/prefect/cli/build_register.py", line 495, in build_and_register
click.echo(f" \u2514\u2500\u2500 ID: {flow_id}")
File "/home/prefect_user/venv/lib/python3.6/site-packages/click/utils.py", line 272, in echo
file.write(message)
UnicodeEncodeError: 'latin-1' codec can't encode characters in position 2-4: ordinal not in range(256)
Hi, when I do a register I get this weird error but it registers and all works fine. I'm on python 3.6.9. Has anyone encountered this before?davzucky
10/29/2021, 5:12 AMdata_loader-orion_agent-1 | Starting agent...
data_loader-orion_agent-1 | Agent started! Checking for flow runs...
data_loader-orion_agent-1 | 05:02:24.615 | Submitting flow run 'ddf9cb6b-231b-48e7-983a-fcaf1df850f7'
data_loader-orion_agent-1 | 05:02:24.616 | Completed submission of flow run 'ddf9cb6b-231b-48e7-983a-fcaf1df850f7'
data_loader-orion_agent-1 | 05:02:28.397 | Flow run 'ddf9cb6b-231b-48e7-983a-fcaf1df850f7' exited with exception: Abort('This run has already terminated.')
and the job doesn't do anything.
Connecting to the container and runner the command
prefect deployment execute load-data/schedulesLoadData
execute the workflow which finish with success
05:04:56.465 | Creating run for flow 'load-data'...
05:04:56.570 | Beginning flow run 'illustrious-nuthatch' for flow 'load-data'...
05:04:56.571 | Starting executor SequentialExecutor...
05:04:56.571 | Flow run 'illustrious-nuthatch' received parameters {'source_path': '/source', 'destination_path': '<s3://credit-risk/raw_data>', 'fs_type_source': 'file', 'fs_type_destination': 'file', 'fs_kwarg_source': {}, 'fs_kwarg_destination': {'client_kwargs': {'endpoint_url': '<http://minio:9000>', 'aws_access_key_id': 'admin', 'aws_secret_access_key': 'welcome123'}}}
05:04:56.628 | Executing flow 'load-data' for flow run 'illustrious-nuthatch'...
05:04:56.629 | Calling loadData('/source', '<s3://credit-risk/raw_data>', 'file', 'file', {}, {'client_kwargs': {'endpoint_url': 'http...)
05:04:56.768 | Submitting task run 'ls_files-c965bedb-0' to executor...
05:04:56.814 | Task run 'ls_files-c965bedb-0' received parameters {'fs_type_source': 'file', 'fs_kwarg_source': {}, 'base_path_source': '/source'}
05:04:56.900 | Executing task 'ls_files' for task run 'ls_files-c965bedb-0'
05:04:56.901 | Calling ls_files('file', {}, '/source')
05:04:57.041 | Task run 'ls_files-c965bedb-0' finished in state Completed(message=None, type=COMPLETED, result=[{'name': '/source/sample_data.csv', 'size': 7564965, 'type': 'file', 'created': 1635483409.8989005, 'islink': False, 'mode': 33188, 'uid': 1000, 'gid': 1000, 'mtime': 1635483409.8989005}], task_run_id=c2d8299a-1586-4310-ae09-2a3aace3fb47)
05:05:00.005 | Shutting down executor SequentialExecutor...
05:05:00.089 | Flow run 'illustrious-nuthatch' finished in state Completed(message='All states completed.', type=COMPLETED, result=[Completed(message=None, type=COMPLETED, result=[{'name': '/source/sample_data.csv', 'size': 7564965, 'type': 'file', 'created': 1635483409.8989005, 'islink': False, 'mode': 33188, 'uid': 1000, 'gid': 1000, 'mtime': 1635483409.8989005}], task_run_id=c2d8299a-1586-4310-ae09-2a3aace3fb47)], flow_run_id=a37ea827-4854-49fd-a4c7-e2e6913514b1)
Trying to get more information by setting the env variable PREFECT_DEBUG_MODE to true doesn't change anything. Do you have any input to try to get more information about what is happening on the agent ?William Grim
10/29/2021, 8:14 AMPREFECT__LOGGING__FORMAT
to be a JSON-style string, and I've set PREFECT__LOGGING__EXTRA_LOGGERS
to capture logging from various libraries as well.
The issue is the %(message)s
strings that get sent to loggers often come back from libraries and things in a format that is not JSON-compatible. For example, they will have double-quotes in them that are not escaped, and I want to catch these so I can pass everything to json.JSONEncoder().encode(msg)
first.
Is there a way to do this? Even if I need to write a different "main" method that sets up an agent for me, I'm willing to do that. I just need to know how/where to setup the hooks.
Much appreciated for any information in advance!!JulienKWM
10/29/2021, 8:17 AM@task()
def sample_task(some):
return some
@task()
def process(current, previous):
pass
with Flow("test") as flow:
previous = None
for n in range(180):
item = sample_task(n)
process(item, previous)
previous = item
Is there an other way to do this (inside a task maybe to handle flow parameters)?
Thank you for your help !haf
10/29/2021, 8:47 AMUsing dask_endpoint=<tcp://dask-scheduler.flows.svc:8786>
Registering flow with label=['prod'] image=example/data-pipelines:cb40d2797195791a3cf195fa1906a1722222222
Registering... executor=DaskExecutor, run_config=KubernetesRun.
From this code:
if args.dask:
print(f"Using dask_endpoint={args.dask_endpoint}")
flow.executor = DaskExecutor(address=args.dask_endpoint)
print(
f" executor={type(flow.executor).__name__}, run_config={type(flow.run_config).__name__}, result={type(flow.result).__name__}."
)
flow.register(
project_name=args.project_name,
build=args.build,
idempotency_key=args.commit_ref,
labels=args.labels,
add_default_labels=False,
)
It just spawns prefect-job
values...Kevin Kho
Vamsi Reddy
10/29/2021, 2:02 PMArun Giridharan
10/29/2021, 2:52 PMget_task_run_result
is giving a FlowRunView
object as a result instead of the task result that I'm expecting.Pedro Machado
10/29/2021, 2:57 PMShellTask
that runs a python script. I am trying to get the logs to show in Prefect Cloud in real time. I am trying to use the PYTHONUNBUFFERED environment variable to force a to print output as it happens but it's not working. I also tried calling the script with the -u
option. Any ideas?haf
10/29/2021, 3:01 PMWill
10/29/2021, 3:10 PMSahil Chopra
10/29/2021, 5:25 PMmonitor_scrape
task that is monitoring the status of a long running job. At the completion of this task, I want to shoot out an email that provides stats outputted from that task. I’m passing the output of the monitor_scrape
task into my email task as the message; but the email seems to be firing independently (see diagram). Any pointers on what I might be doing wrong?
Flow Code:
completion_email_task = EmailTask(email_to='xxx', email_to_cc='xxx')
@task()
def monitor_scraper_run_status() -> Output:
# task code here
def create_flow(flow_name: str) -> Flow:
with Flow(flow_name,
state_handlers=[gmail_notifier(only_states=[Running, Cancelled, Looped, Failed])
]) as flow:
output = monitor_scraper_run_status()
completion_email_task(subject="<SUBJECT_GOES_HERE>", msg=output.__repr__())
return flow
Payam K
10/29/2021, 8:00 PMJacob Bedard
10/29/2021, 11:02 PMUnexpected error: TypeError("cannot pickle '_thread.lock' object")
Can someone let me know what this is if they've had this same issue?
My flow runs ok from the machine where I have the agent, but I can't get it to run when I attempt scheduling or even doing an ad-hoc run from the prefect cloud UIhaf
10/30/2021, 4:07 PMA -> B[0..N] -> C[0..N]
Sometimes some index N fails and then C never runs, because at least one of the upstream Mapped tasks failed.
However, I'd always like to run C for the indices that worked; and the flow itself I want to be marked successful despite a few mappings failing.
How could I configure this?