Xinyi Guo
10/27/2021, 9:43 AMArun Giridharan
10/27/2021, 1:13 PMserhan
10/27/2021, 1:22 PMEric Feldman
10/27/2021, 4:02 PMnew_state.result
keeps being an empty dict when running on prefect agent
if iâm running the flow using flow.run
I do see the results of the stateJason Boorn
10/27/2021, 4:08 PMDonny Flynn
10/27/2021, 4:53 PMImran Qureshi
10/27/2021, 5:22 PMVamsi Reddy
10/27/2021, 6:14 PMKevin Mullins
10/27/2021, 6:43 PMJohn Muehlhausen
10/27/2021, 7:27 PM@task(log_stdout=True)
# ...
with Flow(..., executor=LocalDaskExecutor(scheduler="processes", num_workers=2)) as flow:
I see stdout when using the threads scheduler, but not with processes ... prefect version is 0.14.19 ... running flow locally in Jupyter at this point, no agent ... I should also mention that using the threads scheduler causes subsequent runs of the notebook cell to do nothing, but it is no surprise to me that threads would be less stable than subprocesses whose end state is cleaned up by the OSChristopher Chong Tau Teng
10/28/2021, 3:17 AMprefecthq/prefect:latest
image, which also copied my python script /src/test.py
and ran prefect backend server
. Then in the docker-compose.yaml
generated by prefect server config
, I added a service client
whose command is python /src/test/py
.
After running docker-compose up
, I got the following error from the client
service:
requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f24823ea0d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
Meanwhile, I can access UI and on UI itâs connected to Apollo endpoint successfully. It seems like client
fails to connect to server at localhost:4200?Pierre Monico
10/28/2021, 9:32 AMWill List
10/28/2021, 10:57 AMAhmed Ezzat
10/28/2021, 3:03 PMwith prefect.Flow('myflow') as flow:
task_1 = my_task_1()
task_2 = my_task_2(task_1['value'])
bral
10/28/2021, 3:43 PMConstantino Schillebeeckx
10/28/2021, 7:13 PMException
that caused the failure from this callback?Kelvin Malyar
10/28/2021, 8:20 PMSteve s
10/28/2021, 8:51 PMflow_id
while running?Harish
10/28/2021, 9:09 PMJoe
10/28/2021, 9:22 PMstart_flow_run.map(...)
to use the same flow, but iterate across some provided parameters. Unfortunately when I set the flow_name=unmapped('webfs')
I get this error when attempting to run the flow: `ValueError: Received both flow_id
and flow_name
. Only one flow identifier can be passed.`Harish
10/28/2021, 10:20 PMFile "/home/prefect_user/venv/lib/python3.6/site-packages/prefect/cli/build_register.py", line 495, in build_and_register
click.echo(f" \u2514\u2500\u2500 ID: {flow_id}")
File "/home/prefect_user/venv/lib/python3.6/site-packages/click/utils.py", line 272, in echo
file.write(message)
UnicodeEncodeError: 'latin-1' codec can't encode characters in position 2-4: ordinal not in range(256)
Hi, when I do a register I get this weird error but it registers and all works fine. I'm on python 3.6.9. Has anyone encountered this before?davzucky
10/29/2021, 5:12 AMdata_loader-orion_agent-1 | Starting agent...
data_loader-orion_agent-1 | Agent started! Checking for flow runs...
data_loader-orion_agent-1 | 05:02:24.615 | Submitting flow run 'ddf9cb6b-231b-48e7-983a-fcaf1df850f7'
data_loader-orion_agent-1 | 05:02:24.616 | Completed submission of flow run 'ddf9cb6b-231b-48e7-983a-fcaf1df850f7'
data_loader-orion_agent-1 | 05:02:28.397 | Flow run 'ddf9cb6b-231b-48e7-983a-fcaf1df850f7' exited with exception: Abort('This run has already terminated.')
and the job doesn't do anything.
Connecting to the container and runner the command
prefect deployment execute load-data/schedulesLoadData
execute the workflow which finish with success
05:04:56.465 | Creating run for flow 'load-data'...
05:04:56.570 | Beginning flow run 'illustrious-nuthatch' for flow 'load-data'...
05:04:56.571 | Starting executor SequentialExecutor...
05:04:56.571 | Flow run 'illustrious-nuthatch' received parameters {'source_path': '/source', 'destination_path': '<s3://credit-risk/raw_data>', 'fs_type_source': 'file', 'fs_type_destination': 'file', 'fs_kwarg_source': {}, 'fs_kwarg_destination': {'client_kwargs': {'endpoint_url': '<http://minio:9000>', 'aws_access_key_id': 'admin', 'aws_secret_access_key': 'welcome123'}}}
05:04:56.628 | Executing flow 'load-data' for flow run 'illustrious-nuthatch'...
05:04:56.629 | Calling loadData('/source', '<s3://credit-risk/raw_data>', 'file', 'file', {}, {'client_kwargs': {'endpoint_url': 'http...)
05:04:56.768 | Submitting task run 'ls_files-c965bedb-0' to executor...
05:04:56.814 | Task run 'ls_files-c965bedb-0' received parameters {'fs_type_source': 'file', 'fs_kwarg_source': {}, 'base_path_source': '/source'}
05:04:56.900 | Executing task 'ls_files' for task run 'ls_files-c965bedb-0'
05:04:56.901 | Calling ls_files('file', {}, '/source')
05:04:57.041 | Task run 'ls_files-c965bedb-0' finished in state Completed(message=None, type=COMPLETED, result=[{'name': '/source/sample_data.csv', 'size': 7564965, 'type': 'file', 'created': 1635483409.8989005, 'islink': False, 'mode': 33188, 'uid': 1000, 'gid': 1000, 'mtime': 1635483409.8989005}], task_run_id=c2d8299a-1586-4310-ae09-2a3aace3fb47)
05:05:00.005 | Shutting down executor SequentialExecutor...
05:05:00.089 | Flow run 'illustrious-nuthatch' finished in state Completed(message='All states completed.', type=COMPLETED, result=[Completed(message=None, type=COMPLETED, result=[{'name': '/source/sample_data.csv', 'size': 7564965, 'type': 'file', 'created': 1635483409.8989005, 'islink': False, 'mode': 33188, 'uid': 1000, 'gid': 1000, 'mtime': 1635483409.8989005}], task_run_id=c2d8299a-1586-4310-ae09-2a3aace3fb47)], flow_run_id=a37ea827-4854-49fd-a4c7-e2e6913514b1)
Trying to get more information by setting the env variable PREFECT_DEBUG_MODE to true doesn't change anything. Do you have any input to try to get more information about what is happening on the agent ?William Grim
10/29/2021, 8:14 AMPREFECT__LOGGING__FORMAT
to be a JSON-style string, and I've set PREFECT__LOGGING__EXTRA_LOGGERS
to capture logging from various libraries as well.
The issue is the %(message)s
strings that get sent to loggers often come back from libraries and things in a format that is not JSON-compatible. For example, they will have double-quotes in them that are not escaped, and I want to catch these so I can pass everything to json.JSONEncoder().encode(msg)
first.
Is there a way to do this? Even if I need to write a different "main" method that sets up an agent for me, I'm willing to do that. I just need to know how/where to setup the hooks.
Much appreciated for any information in advance!!JulienKWM
10/29/2021, 8:17 AM@task()
def sample_task(some):
return some
@task()
def process(current, previous):
pass
with Flow("test") as flow:
previous = None
for n in range(180):
item = sample_task(n)
process(item, previous)
previous = item
Is there an other way to do this (inside a task maybe to handle flow parameters)?
Thank you for your help !haf
10/29/2021, 8:47 AMUsing dask_endpoint=<tcp://dask-scheduler.flows.svc:8786>
Registering flow with label=['prod'] image=example/data-pipelines:cb40d2797195791a3cf195fa1906a1722222222
Registering... executor=DaskExecutor, run_config=KubernetesRun.
From this code:
if args.dask:
print(f"Using dask_endpoint={args.dask_endpoint}")
flow.executor = DaskExecutor(address=args.dask_endpoint)
print(
f" executor={type(flow.executor).__name__}, run_config={type(flow.run_config).__name__}, result={type(flow.result).__name__}."
)
flow.register(
project_name=args.project_name,
build=args.build,
idempotency_key=args.commit_ref,
labels=args.labels,
add_default_labels=False,
)
It just spawns prefect-job
values...Kevin Kho
10/29/2021, 1:33 PMVamsi Reddy
10/29/2021, 2:02 PMArun Giridharan
10/29/2021, 2:52 PMget_task_run_result
is giving a FlowRunView
object as a result instead of the task result that I'm expecting.Pedro Machado
10/29/2021, 2:57 PMShellTask
that runs a python script. I am trying to get the logs to show in Prefect Cloud in real time. I am trying to use the PYTHONUNBUFFERED environment variable to force a to print output as it happens but it's not working. I also tried calling the script with the -u
option. Any ideas?haf
10/29/2021, 3:01 PMhaf
10/29/2021, 3:01 PMSylvain Hazard
10/29/2021, 3:03 PMDaskExecutor
and the LocalDaskExecutor
perform Depth First Execution if you chain mapped tasks.Kevin Kho
10/29/2021, 3:03 PMhaf
10/29/2021, 3:04 PMSylvain Hazard
10/29/2021, 3:10 PMLocalDaskExecutor
, I have found it slightly easier to set up.