aaron
08/04/2022, 6:09 PMChu
08/04/2022, 6:25 PMwith Flow
block and the list it returns will be used in create_flow_run.map(parameters=...)
, My question is when we deployed the flow (it runs every midnight), will the function we defined be called each time when Flow is scheduled to run?Jai P
08/04/2022, 6:26 PMKeith
08/04/2022, 6:40 PMprefecthq/prefect:2.0.2-python3.10
and when trying to access GCS it appears that gcsfs
is not installed and am getting the following error from my flow, should I build a custom image that installs gcsfs
?
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/fsspec/registry.py", line 232, in get_filesystem_class
register_implementation(protocol, _import_class(bit["class"]))
File "/usr/local/lib/python3.10/site-packages/fsspec/registry.py", line 255, in _import_class
mod = importlib.import_module(mod)
File "/usr/local/lib/python3.10/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
File "<frozen importlib._bootstrap>", line 1004, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'gcsfs'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 312, in filesystem
self._filesystem = fsspec.filesystem(scheme, **self.settings)
File "/usr/local/lib/python3.10/site-packages/fsspec/registry.py", line 265, in filesystem
cls = get_filesystem_class(protocol)
File "/usr/local/lib/python3.10/site-packages/fsspec/registry.py", line 234, in get_filesystem_class
raise ImportError(bit["err"]) from e
ImportError: Please install gcsfs to access Google Storage
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 247, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/deployments.py", line 47, in load_flow_from_flow_run
await storage_block.get_directory(from_path=None, local_path=".")
File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 454, in get_directory
return await self.filesystem.get_directory(
File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 251, in get_directory
return self.filesystem.get(from_path, local_path, recursive=True)
File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 315, in filesystem
raise RuntimeError(
RuntimeError: File system created with scheme 'gcs' from base path 'gcs://<bucket>/deployments' could not be created. You are likely missing a Python module required to use the given storage protocol.
Roger Webb
08/04/2022, 7:20 PMDOW = Parameter("DOW", default="Monday")
FlowExecution = create_flow_run(
flow_name=FlowName,
project_name=ProjectName,
task_args=dict(name="Flow Execution",trigger=all_successful),
parameters={"StringA":"The Day of the week is "+DOW+"."}
)
AND
DOW = Parameter("DOW", default="Monday")
StringAField = "The Day of the week is "+DOW+"."
FlowExecution = create_flow_run(
flow_name=FlowName,
project_name=ProjectName,
task_args=dict(name="Flow Execution",trigger=all_successful),
parameters={"StringA":StringAField}
)
Jonathan Pou
08/04/2022, 9:01 PMfrom prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
import dask
coiled_executor = DaskTaskRunner(
cluster_class="coiled.Cluster",
cluster_kwargs={
"n_workers" : 4,
"software": "ttibi-dev",
"shutdown_on_close": True,
"worker_vm_types":["r6a.large"]
},
adapt_kwargs={"maximum": 10}
)
@task
def some_data_manipulation():
df = dask.datasets.timeseries(
"2000", "2020", partition_freq="2w"
).persist()
df.groupby("name").aggregate({"x": "sum", "y": "max"}).compute()
return df
@flow(task_runner=coiled_executor)
def test_flow():
some_data_manipulation.submit()
if __name__ == "__main__":
test_flow()
Matt Delacour
08/04/2022, 9:19 PMChris Reuter
08/04/2022, 9:24 PMHafsa Junaid
08/04/2022, 11:08 PMViet Nguyen
08/05/2022, 1:51 AMminhtuan
08/05/2022, 3:11 AMVersion: 2.0.3
from prefect.schedules import IntervalSchedule
ModuleNotFoundError: No module named 'prefect.schedules'
minhtuan
08/05/2022, 3:15 AMHa Pham
08/05/2022, 4:38 AMDeployments are changing, and along with them the way you specify a schedule on a deployment. Stay tuned for updated guidance.
Currently it's not very clear to me how to set a schedule for my flows. Is it set in the flow code, or have to be set via the command line?jcozar
08/05/2022, 7:42 AMHamza Naanani
08/05/2022, 9:11 AMprefect cloud login -k xxxxxxxxxxxxxxxx
.
I'm getting the following error : ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:997)
.
Is there a way to solve this ?
I am using a windows machine and tried setting CURL_CA_BUNDLE=’’, PYTHONHTTPSVERIFY=‘false’ , but it didn't workSven Aoki
08/05/2022, 9:35 AMHa Pham
08/05/2022, 9:53 AMBartosz Kopytek
08/05/2022, 10:18 AMprefect deployment create ./kubernetes-deployment.py
I get this error:
No such command 'create'.
My prefect version is 2.0.0
Anyone know what might be the cause?Milan Valadou
08/05/2022, 10:29 AM---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Input In [1], in <cell line: 6>()
3 secret_block = Secret.load("etlharvestqaspassword")
5 # Access the stored secret
----> 6 secret_block.get()
AttributeError: 'coroutine' object has no attribute 'get'
I defined the block within the Orion UI, because when I tried to define it via code in a simple script (as suggested here), I get the following kind of error:
prefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '<http://ephemeral-orion/api/block_documents/>'
Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'name'], 'msg': 'name must only contain lowercase letters, numbers, and dashes', 'type': 'value_error'}, {'loc': ['body', '__root__'], 'msg': 'Names must be provided for block documents.', 'type': 'value_error'}], 'request_body': {'name': 'test2_password', 'data': {'value': 'test2'}, 'block_schema_id': '8019abd6-409a-4f91-9367-bc8343c31763', 'block_type_id': '29fb0ec8-f7e9-4527-984c-48f8675f2bc4', 'is_anonymous': False}}
For more information check: <https://httpstatuses.com/422>
I’m mostly using Prefect within a jupyter notebook and from within a virtualenv.
Thanks in advance for anyone who could point me to what’s going on 🙂Oscar Björhn
08/05/2022, 10:41 AMNikita Samoylov
08/05/2022, 11:05 AMMilan Valadou
08/05/2022, 12:47 PMMuddassir Shaikh
08/05/2022, 1:49 PMFlorian Guily
08/05/2022, 1:55 PMViet Nguyen
08/05/2022, 2:04 PMprefect-email
to send some dummy test email (I know there's prefect notifications features) but I want test out prefect-email
as well, got this error everytime, hard coded password for dummy test, can't be a wrong password 🤔 smtplib.SMTPAuthenticationError: (535, b'5.7.8 Username and Password not accepted. Learn more at\n5.7.8 <https://support.google.com/mail/?p=BadCredentials> d6-20020a170903230600b0016efc27ca98sm3023696plh.169 - gsmtp'
Thank youChu
08/05/2022, 2:17 PMEvan Curtin
08/05/2022, 2:22 PMResult
like implementation for 2.0, but I can’t find anything in the docs. Closest thing I am seeing is FileSystems
but I don’t see example usage of passing data between tasks using a custom persistence layerTony Yun
08/05/2022, 3:35 PMfile not found
. So I cannot store it in /tmp
and process later in this task?Keith
08/05/2022, 3:51 PMprefect-gcp
to do it, but when I combine this with block information it seems like the info is not in the correct format.
gcs_block = GCS.load("gcs-dev")
@flow()
def example_cloud_storage_upload_blob_from_file_flow():
gcp_credentials = GcpCredentials(service_account_info=gcs_block.service_account_info)
test_upload_file = "test_upload.txt"
blob = cloud_storage_upload_blob_from_file(test_upload_path, gcs_block.bucket_path, "test_upload.txt", gcp_credentials)
return blob
Seth Goodman
08/05/2022, 4:31 PM@task
def actual_task(arg1, arg2, arg3):
#does stuff
task_list = [
(1, "a", "b"),
(2, "c", "d"),
(3, "e", "f"),
(4, "g", "h"),
]
def task_map(task):
return actual_task(task[0], task[1], task[2])
with Flow("my_flow") as flow:
task_results = apply_map(task_map, task_list)