Klemen Strojan
11/20/2020, 9:18 AMAlexander
11/20/2020, 11:06 AMAlexander
11/20/2020, 12:01 PMZach
11/20/2020, 7:35 PMPickleSerializer()
to store during a prefect flow?Zach
11/20/2020, 7:36 PM@task(
checkpoint=True,
result=GCSResult(
bucket="my-bucket",
location=RESULT_LOCATION_FORMAT,
serializer=PickleSerializer(),
)
)
Hamed Sheykhlou
11/20/2020, 9:10 PMimport prefect
import time
from prefect import task, Flow
from prefect.environments.storage import Docker
@task()
def whoami():
logger = prefect.context.get("logger")
time.sleep(200)
<http://logger.info|logger.info>('Hello world')
return 'hello World'
storage = Docker()
flow = Flow("reddit-flow", storage=storage, tasks=[whoami])
built_storage = flow.storage.build(push=False)
print(built_storage.flows)
and run it python code.py
and the image successfully created. but the flow doesn't exist inside the image. neither inside /root/.prefect/
nor /opt/prefect/flows/
. but when I change the code to register the flow, the created image has the flow. (also i think local debugging documentation is outdated)
am I missing something?
thank you in advanceDolor Oculus
11/20/2020, 9:50 PMDolor Oculus
11/20/2020, 10:00 PMnot_filters=[
filters.is_day_of_week(1),
filters.is_day_of_week(7)
],
Yuri Caruso
11/20/2020, 10:47 PMfrom prefect.agent.local.agent import LocalAgent
from os.path import dirname, abspath, join
from os import listdir, path as p,walk
path = abspath(join(dirname(__file__)))
import_paths=[]
for file in listdir(path):
if not file.startswith(".") and p.isdir(file) and file in ['db','service','flows','util'] :
import_paths.append(join(path,file))
for path_import in import_paths:
paths = [p.join(root, name)
for root, dirs, files in walk(path_import)
for name in dirs
if not name.startswith(".") and not name.startswith("__")]
import_paths.extend(paths)
LocalAgent(import_paths=import_paths,show_flow_logs=True).start()
My Supervisor File is as follows:
[program:start_agent_prefect]
command=python3 /home/dev/projetos/prefect-tasks/agent.py
user=dev
autostart=true
autorestart=true
stderr_logfile=/var/log/agent_prefect.err.log
stdout_logfile=/var/log/agent_prefect.out.log
However, the prefect agent is unable to execute the Flows as it cannot find the prefect giving the message below:
[Errno 2] No such file or directory: 'prefect'
[2020-11-20 22: 21: 28,139] INFO - agent | Found 1 flow run (s) to submit for execution.
[2020-11-20 22: 21: 28,176] INFO - agent | Deploying flow run 3ab508f4-731c-450b-8b30-bdaaaeba6117
[2020-11-20 22: 21: 28,180] ERROR - agent | Logging platform error for flow run 3ab508f4-731c-450b-8b30-bdaaaeba6117
[2020-11-20 22: 21: 28,232] ERROR - agent | Error while deploying flow: FileNotFoundError (2, 'No such file or directory')
Note: If you manually run the agent.py file in the folder with:
dev $ python3 agent.py
The agent works normally, could someone give me a light where I am failing?Dolor Oculus
11/20/2020, 11:59 PMKrzysztof Nawara
11/21/2020, 11:28 AMEric
11/21/2020, 6:55 PMEric
11/21/2020, 6:55 PMAvi A
11/22/2020, 3:52 PMRamses E.
11/23/2020, 6:47 AMScott Moreland
11/23/2020, 12:58 PMsql_context.read.table
. Are there any references for this?Joël Luijmes
11/23/2020, 1:01 PMDolor Oculus
11/23/2020, 2:41 PMPREFECT__LOGGING__EXTRA_LOGGERS
as per https://docs.prefect.io/core/concepts/logging.html#extra-loggers during flow registration isn't sufficient -- I'm not seeing my apps logging
statements in the web ui. Is there a way to get this env variable setting to be in effect for when the app runs?Ben Davison
11/23/2020, 4:26 PMDaskKubernetesEnvironment
and am running into a weird issue.tkanas
11/24/2020, 1:16 AMSaulius Beinorius
11/24/2020, 8:22 AMflow
?Riley Hun
11/24/2020, 8:25 AMMichelle Wu
11/24/2020, 9:07 AMdistributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x950\x00\x00\x00\x00\x00\x00\x00\x8c\x1dprefect.engine.executors.dask\x94\x8c\n_maybe_run\x94\x93\x94.'
Traceback (most recent call last):
File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/distributed/worker.py", line 3268, in loads_function
result = cache_loads[bytes_object]
File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/distributed/utils.py", line 1573, in __getitem__
value = super().__getitem__(key)
File "/anaconda3_a/envs/py37/lib/python3.7/collections/__init__.py", line 1027, in __getitem__
raise KeyError(key)
KeyError: b'\x80\x04\x950\x00\x00\x00\x00\x00\x00\x00\x8c\x1dprefect.engine.executors.dask\x94\x8c\n_maybe_run\x94\x93\x94.'
...
ModuleNotFoundError: No module named 'prefect'
I thought that I must install prefect on machine B for the task to run remotely. So then I installed prefect and reran the task, a new error occurred like this on machineB:
[2020-11-24 17:02:05+0800] ERROR - prefect.CloudTaskRunner | Failed to set task state with error: ConnectionError(MaxRetryError("HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6099fcf10>: Failed to establish a new connection: [Errno 111] Connection refused'))"))
Traceback (most recent call last):
File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
(self._dns_host, self.port), self.timeout, **extra_kw
File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection
raise err
File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
...
2020-11-24 17:02:05+0800] INFO - prefect.CloudTaskRunner | Task 'test_task': Finished task run for task with final state: 'ClientFailed'
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
I am really confused by “ConnectionError” and “Unable to write logs to Prefect Cloud”. Do you guys have any idea why this might be happening? Must I install Docker (on machine B) as well for the task to run?Saulius Beinorius
11/24/2020, 9:38 AMlogging.dictConfig()
-like format, especially since Dask supports it (https://docs.dask.org/en/latest/debugging.html#logs). If you could point me in the right direction where the logging setup happens (I think I found the agent's logging setup, but not sure about task logging, I assume that's separate for each execution environment?), I would be willing to work on a PR to allow specifying logging via dict format. For reference, I have `uvicorn`'s server, which has similar options to Prefect, but also allows passing the entire logging configuration.Florian L
11/24/2020, 10:35 AMsignal.ENDRUN(state.Cancelled())
which put my task in the right state, but the following tasks are still executed meaning the trigger all_successfull is still true. Any idea what am i doing wrong ?Andrey Tatarinov
11/24/2020, 11:17 AMReadGsheetRow
, ReadAirtableRow
etc.
What is their use case? All of these tasks Read- and Write- counterparts receive n
- identifier of the row to be read or written.
In our experience there's no such scenario when you need just a single row from table. Even more: there's no way to find out the size of the table. So is anybody actually using these tasks in this specific implementation?Andrey Tatarinov
11/24/2020, 1:51 PMScott Moreland
11/24/2020, 2:42 PMRiley Hun
11/24/2020, 5:44 PMJasono
11/24/2020, 6:00 PMJasono
11/24/2020, 6:00 PMjosh
11/24/2020, 6:52 PMJasono
11/24/2020, 6:55 PMjosh
11/24/2020, 6:57 PMJasono
11/24/2020, 6:58 PMjosh
11/24/2020, 7:02 PMwith Flow():
my_task(context.get("reports"))
That will not work because context is being evaluated at initialization of the flow object before registration. Subsequently the context provided by the UI will not be read in there because that code is running prior to the flow run. Instead your context should be accessed inside of a task
@task
def my_task():
prefect.context("reports")
Jasono
11/24/2020, 7:06 PMwith Flow("bdx_process_9xx") as flow:
acctDate = str(prefect.context.acctDate)
prodDate = str(prefect.context.prodDate)
runDesc9xx = prefect.context.runDesc9xx
print(f'ProdDate={prodDate} ..{runDesc9xx}')
for queryKey, (queryCmd, dependQry) in cmdPairs(acctDate, prodDate, runDesc9xx).items():
task = BDX_Task(name=queryKey)(queryKey =queryKey,
queryCmd=queryCmd
)
flow.add_task(task)
if len(dependQry) > 0 :
if type(dependQry) is list:
for q in dependQry:
flow.get_tasks(name=queryKey)[0].set_upstream(flow.get_tasks(name=q)[0])
else:
flow.get_tasks(name=queryKey)[0].set_upstream(flow.get_tasks(name=dependQry)[0])
josh
11/24/2020, 7:15 PMJasono
11/24/2020, 7:16 PM