Alexander
11/20/2020, 12:01 PMZach
11/20/2020, 7:35 PMPickleSerializer()
to store during a prefect flow?Zach
11/20/2020, 7:36 PM@task(
checkpoint=True,
result=GCSResult(
bucket="my-bucket",
location=RESULT_LOCATION_FORMAT,
serializer=PickleSerializer(),
)
)
Hamed Sheykhlou
11/20/2020, 9:10 PMimport prefect
import time
from prefect import task, Flow
from prefect.environments.storage import Docker
@task()
def whoami():
logger = prefect.context.get("logger")
time.sleep(200)
<http://logger.info|logger.info>('Hello world')
return 'hello World'
storage = Docker()
flow = Flow("reddit-flow", storage=storage, tasks=[whoami])
built_storage = flow.storage.build(push=False)
print(built_storage.flows)
and run it python code.py
and the image successfully created. but the flow doesn't exist inside the image. neither inside /root/.prefect/
nor /opt/prefect/flows/
. but when I change the code to register the flow, the created image has the flow. (also i think local debugging documentation is outdated)
am I missing something?
thank you in advanceDolor Oculus
11/20/2020, 9:50 PMDolor Oculus
11/20/2020, 10:00 PMnot_filters=[
filters.is_day_of_week(1),
filters.is_day_of_week(7)
],
Yuri Caruso
11/20/2020, 10:47 PMfrom prefect.agent.local.agent import LocalAgent
from os.path import dirname, abspath, join
from os import listdir, path as p,walk
path = abspath(join(dirname(__file__)))
import_paths=[]
for file in listdir(path):
if not file.startswith(".") and p.isdir(file) and file in ['db','service','flows','util'] :
import_paths.append(join(path,file))
for path_import in import_paths:
paths = [p.join(root, name)
for root, dirs, files in walk(path_import)
for name in dirs
if not name.startswith(".") and not name.startswith("__")]
import_paths.extend(paths)
LocalAgent(import_paths=import_paths,show_flow_logs=True).start()
My Supervisor File is as follows:
[program:start_agent_prefect]
command=python3 /home/dev/projetos/prefect-tasks/agent.py
user=dev
autostart=true
autorestart=true
stderr_logfile=/var/log/agent_prefect.err.log
stdout_logfile=/var/log/agent_prefect.out.log
However, the prefect agent is unable to execute the Flows as it cannot find the prefect giving the message below:
[Errno 2] No such file or directory: 'prefect'
[2020-11-20 22: 21: 28,139] INFO - agent | Found 1 flow run (s) to submit for execution.
[2020-11-20 22: 21: 28,176] INFO - agent | Deploying flow run 3ab508f4-731c-450b-8b30-bdaaaeba6117
[2020-11-20 22: 21: 28,180] ERROR - agent | Logging platform error for flow run 3ab508f4-731c-450b-8b30-bdaaaeba6117
[2020-11-20 22: 21: 28,232] ERROR - agent | Error while deploying flow: FileNotFoundError (2, 'No such file or directory')
Note: If you manually run the agent.py file in the folder with:
dev $ python3 agent.py
The agent works normally, could someone give me a light where I am failing?Dolor Oculus
11/20/2020, 11:59 PMKrzysztof Nawara
11/21/2020, 11:28 AMEric
11/21/2020, 6:55 PMEric
11/21/2020, 6:55 PMAvi A
11/22/2020, 3:52 PMRamses E.
11/23/2020, 6:47 AMScott Moreland
11/23/2020, 12:58 PMsql_context.read.table
. Are there any references for this?Joël Luijmes
11/23/2020, 1:01 PMDolor Oculus
11/23/2020, 2:41 PMPREFECT__LOGGING__EXTRA_LOGGERS
as per https://docs.prefect.io/core/concepts/logging.html#extra-loggers during flow registration isn't sufficient -- I'm not seeing my apps logging
statements in the web ui. Is there a way to get this env variable setting to be in effect for when the app runs?Ben Davison
11/23/2020, 4:26 PMDaskKubernetesEnvironment
and am running into a weird issue.tkanas
11/24/2020, 1:16 AMSaulius Beinorius
11/24/2020, 8:22 AMflow
?Riley Hun
11/24/2020, 8:25 AMMichelle Wu
11/24/2020, 9:07 AMdistributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x950\x00\x00\x00\x00\x00\x00\x00\x8c\x1dprefect.engine.executors.dask\x94\x8c\n_maybe_run\x94\x93\x94.'
Traceback (most recent call last):
File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/distributed/worker.py", line 3268, in loads_function
result = cache_loads[bytes_object]
File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/distributed/utils.py", line 1573, in __getitem__
value = super().__getitem__(key)
File "/anaconda3_a/envs/py37/lib/python3.7/collections/__init__.py", line 1027, in __getitem__
raise KeyError(key)
KeyError: b'\x80\x04\x950\x00\x00\x00\x00\x00\x00\x00\x8c\x1dprefect.engine.executors.dask\x94\x8c\n_maybe_run\x94\x93\x94.'
...
ModuleNotFoundError: No module named 'prefect'
I thought that I must install prefect on machine B for the task to run remotely. So then I installed prefect and reran the task, a new error occurred like this on machineB:
[2020-11-24 17:02:05+0800] ERROR - prefect.CloudTaskRunner | Failed to set task state with error: ConnectionError(MaxRetryError("HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6099fcf10>: Failed to establish a new connection: [Errno 111] Connection refused'))"))
Traceback (most recent call last):
File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
(self._dns_host, self.port), self.timeout, **extra_kw
File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection
raise err
File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
...
2020-11-24 17:02:05+0800] INFO - prefect.CloudTaskRunner | Task 'test_task': Finished task run for task with final state: 'ClientFailed'
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
I am really confused by “ConnectionError” and “Unable to write logs to Prefect Cloud”. Do you guys have any idea why this might be happening? Must I install Docker (on machine B) as well for the task to run?Saulius Beinorius
11/24/2020, 9:38 AMlogging.dictConfig()
-like format, especially since Dask supports it (https://docs.dask.org/en/latest/debugging.html#logs). If you could point me in the right direction where the logging setup happens (I think I found the agent's logging setup, but not sure about task logging, I assume that's separate for each execution environment?), I would be willing to work on a PR to allow specifying logging via dict format. For reference, I have `uvicorn`'s server, which has similar options to Prefect, but also allows passing the entire logging configuration.Florian L
11/24/2020, 10:35 AMsignal.ENDRUN(state.Cancelled())
which put my task in the right state, but the following tasks are still executed meaning the trigger all_successfull is still true. Any idea what am i doing wrong ?Andrey Tatarinov
11/24/2020, 11:17 AMReadGsheetRow
, ReadAirtableRow
etc.
What is their use case? All of these tasks Read- and Write- counterparts receive n
- identifier of the row to be read or written.
In our experience there's no such scenario when you need just a single row from table. Even more: there's no way to find out the size of the table. So is anybody actually using these tasks in this specific implementation?Andrey Tatarinov
11/24/2020, 1:51 PMScott Moreland
11/24/2020, 2:42 PMRiley Hun
11/24/2020, 5:44 PMJasono
11/24/2020, 6:00 PMLuke Orland
11/24/2020, 6:02 PMflow.run(run_on_schedule=False)
or using FlowRunner, without using an agent?josh
11/24/2020, 8:02 PM0.13.17
has been released and here are a few notable changes:
prefect duckprefect duck Nicer support for tasks with multiple results
⚠️ Improved logs and warnings
♻️ Cleaner flow-run restarts when using resource managers
#️⃣ Squashed bug in flow-hashing for idempotency
A big thank you to our contributors who helped out with this release! Full changelog:
https://github.com/PrefectHQ/prefect/releases/tag/0.13.17