Florian Kühnlenz
08/04/2022, 2:02 PMMatt Delacour
08/04/2022, 2:25 PMTony Yun
08/04/2022, 3:18 PMprefect.tasks.sftp.sftp
I’ve tried 1.20, 2.0 version but all don’t have this module available.Matt Delacour
08/04/2022, 3:38 PMDylan McReynolds
08/04/2022, 3:46 PMChris Reuter
08/04/2022, 3:55 PMGeorvic Tur
08/04/2022, 3:56 PMWilliam Jamir
08/04/2022, 4:37 PMMike Kovetsky
08/04/2022, 4:40 PMprefect kubernetes manifest orion
to k8s_deployment.yaml
(please add the ability to set non-default k8s namespace)
• change PREFECT_API_URL from http://orion:4200/api to http://0.0.0.0:4200/api
• kubectl apply -f k8s_deployment.yaml
(it already has command: [“prefect”, “orion”, “start”, “--host”, “0.0.0.0”, “--log-level”, “WARNING”] for API container)
• check the GCP admin. The agent/ui/api seems to be healthy.
• kubectl port-forward deployment/orion 4200:4200
• check the UI is healthy via http://127.0.0.1:4200/.
• prefect config set PREFECT_API_URL=<http://0.0.0.0:4200/api>
• register GCS block via admin panel. name=prefect. path: gs://{my-path}. (prefect block register -f gcs_block.py didn’t work for me for some reason)
• prefect deployment build ../flows/backtest.py:backtest_flow --name dev --tag dev --infra kubernetes-job --storage-block gcs/prefect
(should i run it from root of the project?)
• checked my GCS bucket. The files did appear in the local folder and GCS.
• prefect deployment apply backtest_flow-deployment.yaml
• check UI http://127.0.0.1:4200/deployments. deployment did appear
• create kubernetes
queue manually. The name seems to be hardcoded according to the error logs in GCP agent. This step fixes the error.
• prefect deployment run Backtest/dev
• the flow run was created. But it is stuck in pending.
• The appropriate KubernetesJob was created. But it is stuck in error state with the error All connection attempts failed. and ConnectionRefusedError: [Errno 111] Connect call failed (‘0.0.0.0’, 4200).
Please help! Thank you in advance :)William Jamir
08/04/2022, 5:09 PMprefect agent start -t test
I can see that a new “Work Queue” is added to the UI (Agent queue test
)
But if I start a new agent (on another tab) with the same command, the UI keeps the same.
Is this the expected behavior? Would it be possible to add a --name
parameter on the start command to allow starting multiples agents to consume the same tag?aaron
08/04/2022, 6:09 PMChu
08/04/2022, 6:25 PMwith Flow
block and the list it returns will be used in create_flow_run.map(parameters=...)
, My question is when we deployed the flow (it runs every midnight), will the function we defined be called each time when Flow is scheduled to run?Jai P
08/04/2022, 6:26 PMKeith
08/04/2022, 6:40 PMprefecthq/prefect:2.0.2-python3.10
and when trying to access GCS it appears that gcsfs
is not installed and am getting the following error from my flow, should I build a custom image that installs gcsfs
?
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/fsspec/registry.py", line 232, in get_filesystem_class
register_implementation(protocol, _import_class(bit["class"]))
File "/usr/local/lib/python3.10/site-packages/fsspec/registry.py", line 255, in _import_class
mod = importlib.import_module(mod)
File "/usr/local/lib/python3.10/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
File "<frozen importlib._bootstrap>", line 1004, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'gcsfs'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 312, in filesystem
self._filesystem = fsspec.filesystem(scheme, **self.settings)
File "/usr/local/lib/python3.10/site-packages/fsspec/registry.py", line 265, in filesystem
cls = get_filesystem_class(protocol)
File "/usr/local/lib/python3.10/site-packages/fsspec/registry.py", line 234, in get_filesystem_class
raise ImportError(bit["err"]) from e
ImportError: Please install gcsfs to access Google Storage
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 247, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/deployments.py", line 47, in load_flow_from_flow_run
await storage_block.get_directory(from_path=None, local_path=".")
File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 454, in get_directory
return await self.filesystem.get_directory(
File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 251, in get_directory
return self.filesystem.get(from_path, local_path, recursive=True)
File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 315, in filesystem
raise RuntimeError(
RuntimeError: File system created with scheme 'gcs' from base path 'gcs://<bucket>/deployments' could not be created. You are likely missing a Python module required to use the given storage protocol.
Roger Webb
08/04/2022, 7:20 PMDOW = Parameter("DOW", default="Monday")
FlowExecution = create_flow_run(
flow_name=FlowName,
project_name=ProjectName,
task_args=dict(name="Flow Execution",trigger=all_successful),
parameters={"StringA":"The Day of the week is "+DOW+"."}
)
AND
DOW = Parameter("DOW", default="Monday")
StringAField = "The Day of the week is "+DOW+"."
FlowExecution = create_flow_run(
flow_name=FlowName,
project_name=ProjectName,
task_args=dict(name="Flow Execution",trigger=all_successful),
parameters={"StringA":StringAField}
)
Jonathan Pou
08/04/2022, 9:01 PMfrom prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
import dask
coiled_executor = DaskTaskRunner(
cluster_class="coiled.Cluster",
cluster_kwargs={
"n_workers" : 4,
"software": "ttibi-dev",
"shutdown_on_close": True,
"worker_vm_types":["r6a.large"]
},
adapt_kwargs={"maximum": 10}
)
@task
def some_data_manipulation():
df = dask.datasets.timeseries(
"2000", "2020", partition_freq="2w"
).persist()
df.groupby("name").aggregate({"x": "sum", "y": "max"}).compute()
return df
@flow(task_runner=coiled_executor)
def test_flow():
some_data_manipulation.submit()
if __name__ == "__main__":
test_flow()
Matt Delacour
08/04/2022, 9:19 PMChris Reuter
08/04/2022, 9:24 PMHafsa Junaid
08/04/2022, 11:08 PMViet Nguyen
08/05/2022, 1:51 AMminhtuan
08/05/2022, 3:11 AMVersion: 2.0.3
from prefect.schedules import IntervalSchedule
ModuleNotFoundError: No module named 'prefect.schedules'
minhtuan
08/05/2022, 3:15 AMHa Pham
08/05/2022, 4:38 AMDeployments are changing, and along with them the way you specify a schedule on a deployment. Stay tuned for updated guidance.
Currently it's not very clear to me how to set a schedule for my flows. Is it set in the flow code, or have to be set via the command line?jcozar
08/05/2022, 7:42 AMHamza Naanani
08/05/2022, 9:11 AMprefect cloud login -k xxxxxxxxxxxxxxxx
.
I'm getting the following error : ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:997)
.
Is there a way to solve this ?
I am using a windows machine and tried setting CURL_CA_BUNDLE=’’, PYTHONHTTPSVERIFY=‘false’ , but it didn't workSven Aoki
08/05/2022, 9:35 AMHa Pham
08/05/2022, 9:53 AMBartosz Kopytek
08/05/2022, 10:18 AMprefect deployment create ./kubernetes-deployment.py
I get this error:
No such command 'create'.
My prefect version is 2.0.0
Anyone know what might be the cause?Milan Valadou
08/05/2022, 10:29 AM---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Input In [1], in <cell line: 6>()
3 secret_block = Secret.load("etlharvestqaspassword")
5 # Access the stored secret
----> 6 secret_block.get()
AttributeError: 'coroutine' object has no attribute 'get'
I defined the block within the Orion UI, because when I tried to define it via code in a simple script (as suggested here), I get the following kind of error:
prefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '<http://ephemeral-orion/api/block_documents/>'
Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'name'], 'msg': 'name must only contain lowercase letters, numbers, and dashes', 'type': 'value_error'}, {'loc': ['body', '__root__'], 'msg': 'Names must be provided for block documents.', 'type': 'value_error'}], 'request_body': {'name': 'test2_password', 'data': {'value': 'test2'}, 'block_schema_id': '8019abd6-409a-4f91-9367-bc8343c31763', 'block_type_id': '29fb0ec8-f7e9-4527-984c-48f8675f2bc4', 'is_anonymous': False}}
For more information check: <https://httpstatuses.com/422>
I’m mostly using Prefect within a jupyter notebook and from within a virtualenv.
Thanks in advance for anyone who could point me to what’s going on 🙂Oscar Björhn
08/05/2022, 10:41 AMOscar Björhn
08/05/2022, 10:41 AMKhuyen Tran
08/05/2022, 3:27 PMMason Menges
08/05/2022, 3:28 PMOscar Björhn
08/05/2022, 4:38 PM